diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java index 553e0eb65..180bdae78 100644 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java @@ -12,6 +12,7 @@ public interface ErrorCodeConstants { ErrorCode DEVICE_NOT_EXISTS = new ErrorCode(1_003_000_000, "设备不存在"); ErrorCode DEVICE_REFERENCES_EXIST = new ErrorCode(1_003_000_000, "存在设备已被引用,请先删除引用。"); ErrorCode DEVICE_MQTT_TOPIC_EXIST = new ErrorCode(1_003_000_000, "设备MQTT主题不存在。"); + ErrorCode DEVICE_MQTT_TOPIC_ALREADY_EXIST = new ErrorCode(1_003_000_000, "该MQTT主题已被其他设备订阅,一台设备只能绑定一个主题。"); ErrorCode DEVICE_EXISTS = new ErrorCode(1_003_000_000, "同名或同主题设备已存在"); @@ -78,4 +79,11 @@ public interface ErrorCodeConstants { ErrorCode RECIPE_PLAN_DETAIL_CODE_EXISTS = new ErrorCode(1_003_000_006, "编码已存在"); ErrorCode RECIPE_POINT_RECORD_NOT_EXISTS = new ErrorCode(1_003_000_007, "IoT配方点位记录不存在"); ErrorCode RECIPE_DEVICE_RECORD_NOT_EXISTS = new ErrorCode(1_003_000_008, "设备点位采集值记录不存在"); + + // ======================================= Tdengine ============================================ + + ErrorCode TABLE_CREATION_FAILED = new ErrorCode(1_004_000_008, "TDengine 表创建失败"); + ErrorCode COLOUMN_CREATION_FAILED = new ErrorCode(1_004_000_008, "TDengine 列创建失败"); + + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/DeviceController.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/DeviceController.java index 5042e62e5..6805f9961 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/DeviceController.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/DeviceController.java @@ -52,8 +52,8 @@ public class DeviceController { @Resource private JobService jobService; - @Resource - private OpcUaSubscriptionService opcUaService; +// @Resource +// private OpcUaSubscriptionService opcUaService; // @Resource // private IMqttservice mqttService; @@ -65,7 +65,7 @@ public class DeviceController { public CommonResult createDevice(@Valid @RequestBody DeviceSaveReqVO createReqVO) throws SchedulerException { DeviceDO device = deviceService.createDevice(createReqVO); //初始化Td表 - tDengineService.initDatabaseAndTable(device.getId()); +// tDengineService.initDatabaseAndTable(device.getId()); return success(device); } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/OpcUaSubscriptionService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/OpcUaSubscriptionService.java deleted file mode 100644 index a4cfb7506..000000000 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/OpcUaSubscriptionService.java +++ /dev/null @@ -1,136 +0,0 @@ -package cn.iocoder.yudao.module.iot.controller.admin.device; - - -import lombok.extern.slf4j.Slf4j; -import org.eclipse.milo.opcua.sdk.client.OpcUaClient; -import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig; -import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider; -import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider; -import org.eclipse.milo.opcua.sdk.client.api.subscriptions.*; -import org.eclipse.milo.opcua.stack.client.DiscoveryClient; -import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy; -import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId; -import org.eclipse.milo.opcua.stack.core.types.builtin.Variant; -import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger; -import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode; -import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn; -import org.eclipse.milo.opcua.stack.core.types.structured.*; -import org.springframework.stereotype.Service; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; - -import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint; - -@Slf4j -@Service -public class OpcUaSubscriptionService { - - private static final String URL = "opc.tcp://192.168.21.5:4840"; - private static final String USERNAME = "bst"; - private static final String PASSWORD = "Bst123456"; - - private OpcUaClient client; - private UaSubscription subscription; - - // 保存已订阅的节点 - private final ConcurrentHashMap monitoredItems = new ConcurrentHashMap<>(); - - /** - * 订阅 OPC UA 节点变化 - */ - public void subscribeNode(String nodeAddress, int samplingMillis) throws Exception { - - if (client == null) { - connect(); - } - - if (subscription == null) { - subscription = client.getSubscriptionManager() - .createSubscription((double) samplingMillis) - .get(); - log.info("创建订阅,采样间隔: {} ms", samplingMillis); - } - - NodeId nodeId = NodeId.parse(nodeAddress); - - ReadValueId readValueId = new ReadValueId( - nodeId, - org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger.valueOf(13), // Value 属性 - null, - null - ); - - MonitoringParameters parameters = new MonitoringParameters( - uint(nodeId.hashCode()), - (double) samplingMillis, - null, - uint(10), - true - ); - - MonitoredItemCreateRequest request = new MonitoredItemCreateRequest( - readValueId, - MonitoringMode.Reporting, - parameters - ); - List requests = new ArrayList<>(); - requests.add(request); - List items = subscription.createMonitoredItems( - TimestampsToReturn.Both, - requests - ).get(); - - UaMonitoredItem item = items.get(0); - item.setValueConsumer((monitoredItem, value) -> { - Variant variant = value.getValue(); - Object v = variant != null ? variant.getValue() : null; - log.info("节点 {} 值变化: {}", nodeAddress, v); - }); - - monitoredItems.put(nodeAddress, item); - - log.info("成功订阅节点: {}", nodeAddress); - } - - private void connect() throws Exception { - IdentityProvider identity = new UsernameProvider(USERNAME, PASSWORD); - - // 1. 获取端点 - List endpoints = DiscoveryClient.getEndpoints(URL) - .get(5, TimeUnit.SECONDS); - - EndpointDescription endpoint = endpoints.stream() - .filter(e -> SecurityPolicy.None.getUri().equals(e.getSecurityPolicyUri())) - .findFirst() - .orElseThrow(() -> new RuntimeException("未找到 SecurityPolicy=None 的端点")); - - // 2. 构建客户端配置 - OpcUaClientConfig config = OpcUaClientConfig.builder() - .setEndpoint(endpoint) - .setIdentityProvider(identity) - .build(); - - // 3. 创建客户端并连接 - client = OpcUaClient.create(config); - client.connect().get(); - - log.info("成功连接 OPC UA 服务端: {}", URL); - } - -// /** -// * 断开连接 -// */ -// public void disconnect() { -// if (client != null) { -// try { -// client.disconnect().get(); -// log.info("断开 OPC UA 连接"); -// } catch (Exception e) { -// log.error("断开 OPC UA 连接失败", e); -// } -// } -// } -} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/enums/JavaToTdengineTypeEnum.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/enums/JavaToTdengineTypeEnum.java new file mode 100644 index 000000000..e2d595a1f --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/enums/JavaToTdengineTypeEnum.java @@ -0,0 +1,136 @@ +package cn.iocoder.yudao.module.iot.controller.admin.device.enums; + +/** + * Java 基本类型对应 TDengine 列类型枚举 + */ +public enum JavaToTdengineTypeEnum { + + BYTE("byte", "TINYINT") { + @Override + public Object convertValue(String value) { + return Byte.parseByte(value); + } + }, + + SHORT("short", "SMALLINT") { + @Override + public Object convertValue(String value) { + return Short.parseShort(value); + } + }, + + INT("int", "INT") { + @Override + public Object convertValue(String value) { + return Integer.parseInt(value); + } + }, + + LONG("long", "BIGINT") { + @Override + public Object convertValue(String value) { + return Long.parseLong(value); + } + }, + + FLOAT("float", "FLOAT") { + @Override + public Object convertValue(String value) { + return Float.parseFloat(value); + } + }, + + DOUBLE("double", "DOUBLE") { + @Override + public Object convertValue(String value) { + return Double.parseDouble(value); + } + }, + + BOOLEAN("boolean", "BOOL") { + @Override + public Object convertValue(String value) { + return Boolean.parseBoolean(value); + } + }, + + CHAR("char", "NCHAR(1)") { + @Override + public Object convertValue(String value) { + return value.charAt(0); + } + }; + + private final String javaType; + private final String tdType; + + JavaToTdengineTypeEnum(String javaType, String tdType) { + this.javaType = javaType; + this.tdType = tdType; + } + + public String getJavaType() { + return javaType; + } + + public String getTdType() { + return tdType; + } + + /** + * 每个枚举实现自己的值转换 + */ + public abstract Object convertValue(String value); + + /** + * 根据 javaType 获取枚举 + */ + public static JavaToTdengineTypeEnum fromJavaType(String javaType) { + + if (javaType == null) { + return null; + } + + for (JavaToTdengineTypeEnum e : values()) { + + if (e.javaType.equalsIgnoreCase(javaType)) { + return e; + } + } + + return null; + } + + /** + * 根据 javaType 获取 TDengine 类型 + */ + public static String getTdTypeByJavaType(String javaType) { + + JavaToTdengineTypeEnum e = fromJavaType(javaType); + + return e != null ? e.getTdType() : null; + } + + /** + * 根据 javaType 转换 value + */ + public static Object convertValue(String javaType, String value) { + + try { + + JavaToTdengineTypeEnum e = fromJavaType(javaType); + + if (e == null) { + return Double.parseDouble(value); + } + + return e.convertValue(value); + + } catch (Exception ex) { + + throw new RuntimeException( + "数据类型转换失败: javaType=" + javaType + ", value=" + value, ex + ); + } + } +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/opcuv/OpcShutdown.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/opcuv/OpcShutdown.java deleted file mode 100644 index ad35378ad..000000000 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/opcuv/OpcShutdown.java +++ /dev/null @@ -1,14 +0,0 @@ -package cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.opcuv; - -import cn.iocoder.yudao.framework.common.util.opc.OpcUtils; -import org.springframework.stereotype.Component; - -import javax.annotation.PreDestroy; - -@Component -public class OpcShutdown { - @PreDestroy - public void shutdown() { - OpcUtils.disconnectAll(); - } -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/utils/DataTypeParseUtil.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/utils/DataTypeParseUtil.java new file mode 100644 index 000000000..0f1cc41e2 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/utils/DataTypeParseUtil.java @@ -0,0 +1,162 @@ +package cn.iocoder.yudao.module.iot.controller.admin.device.utils; + +import lombok.extern.slf4j.Slf4j; + +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.time.LocalDateTime; + +/** + * 数据类型解析工具类(静态工具类) + * + * 支持 TDengine / MQTT / OPC / JDBC 数据类型转换 + */ +@Slf4j +public final class DataTypeParseUtil { + + /** + * 私有构造函数,防止实例化 + */ + private DataTypeParseUtil() { + throw new UnsupportedOperationException("Utility class cannot be instantiated"); + } + + /** + * 根据 dataType 解析 value + * + * @param value 原始值 + * @param dataType 数据类型 + * @return 转换后的值 + */ + public static Object parse(Object value, String dataType) { + + if (value == null || dataType == null) { + return null; + } + + try { + + switch (dataType.toLowerCase()) { + + case "int": + case "integer": + return toInt(value); + + case "long": + return toLong(value); + + case "double": + return toDouble(value); + + case "float": + return toFloat(value); + + case "short": + return toShort(value); + + case "boolean": + return toBoolean(value); + + case "string": + case "varchar": + case "text": + return value.toString(); + + case "bigdecimal": + return toBigDecimal(value); + + case "timestamp": + case "datetime": + return toLocalDateTime(value); + + default: + return value; + } + + } catch (Exception e) { + + log.warn("数据类型解析失败 value={}, dataType={}", value, dataType); + + return value; + } + } + + public static Integer toInt(Object value) { + + if (value instanceof Number) { + return ((Number) value).intValue(); + } + + return Integer.parseInt(value.toString()); + } + + public static Long toLong(Object value) { + + if (value instanceof Number) { + return ((Number) value).longValue(); + } + + return Long.parseLong(value.toString()); + } + + public static Double toDouble(Object value) { + + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + + return Double.parseDouble(value.toString()); + } + + public static Float toFloat(Object value) { + + if (value instanceof Number) { + return ((Number) value).floatValue(); + } + + return Float.parseFloat(value.toString()); + } + + public static Short toShort(Object value) { + + if (value instanceof Number) { + return ((Number) value).shortValue(); + } + + return Short.parseShort(value.toString()); + } + + public static Boolean toBoolean(Object value) { + + if (value instanceof Boolean) { + return (Boolean) value; + } + + String str = value.toString().toLowerCase(); + + return "true".equals(str) || "1".equals(str); + } + + public static BigDecimal toBigDecimal(Object value) { + + if (value instanceof BigDecimal) { + return (BigDecimal) value; + } + + return new BigDecimal(value.toString()); + } + + public static LocalDateTime toLocalDateTime(Object value) { + + if (value instanceof Timestamp) { + return ((Timestamp) value).toLocalDateTime(); + } + + if (value instanceof LocalDateTime) { + return (LocalDateTime) value; + } + + return LocalDateTime.parse(value.toString()); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/recipedevicerecord/RecipeDeviceRecordController.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/recipedevicerecord/RecipeDeviceRecordController.java index 75fe0c168..e3a4e86e5 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/recipedevicerecord/RecipeDeviceRecordController.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/recipedevicerecord/RecipeDeviceRecordController.java @@ -148,7 +148,6 @@ public class RecipeDeviceRecordController { /** * 批量创建设备点位采集记录和配方点位记录 - * @param recipeId 配方ID * @return 创建结果 * @throws JsonProcessingException JSON处理异常 */ @@ -198,6 +197,8 @@ public class RecipeDeviceRecordController { Map> deviceDataMap = deviceService.createDeviceDataMap(device.getId());//recipeRespVO.getDeviceId() + Map map = tDengineService.newSelectLatestRow(device.getId()); + // OpcUtils.connect(device.getId(),device.getUrl(),device.getUsername(),device.getPassword(),10); for (RecipeDeviceAttributeDO attributeDO : attributeList) { @@ -216,9 +217,10 @@ public class RecipeDeviceRecordController { recipeDeviceRecordDO.setDeviceId(deviceContactModelDO.getDeviceId()); recipeDeviceRecordDO.setDataUnit(deviceContactModelDO.getDataUnit()); // recipeDeviceRecordDO.setValue((String) OpcUtils.readValues(device.getId(),deviceContactModelDO.getAddress())); - if (data.get("addressValue") != null && data.get("addressValue").toString() != null) { - recipeDeviceRecordDO.setValue(data.get("addressValue").toString()); - } +// if (data.get("addressValue") != null && data.get("addressValue").toString() != null) { +// recipeDeviceRecordDO.setValue(data.get("addressValue").toString()); +// } + recipeDeviceRecordDO.setValue(map.get(deviceContactModelDO.getAttributeCode())); recipeDeviceRecordService.createRecipeDeviceRecord(BeanUtils.toBean(recipeDeviceRecordDO, RecipeDeviceRecordSaveReqVO.class)); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/recipedevicerecord/RecipeDeviceRecordDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/recipedevicerecord/RecipeDeviceRecordDO.java index 9c02b2c5a..1e2a1f9b4 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/recipedevicerecord/RecipeDeviceRecordDO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/recipedevicerecord/RecipeDeviceRecordDO.java @@ -66,7 +66,7 @@ public class RecipeDeviceRecordDO extends BaseDO { /** * 采集值 */ - private String value; + private Object value; /** * 配方id diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/device/DeviceMapper.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/device/DeviceMapper.java index bced60a98..e5f013f96 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/device/DeviceMapper.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/device/DeviceMapper.java @@ -12,12 +12,14 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceDO; import com.alibaba.excel.util.StringUtils; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import org.apache.ibatis.annotations.MapKey; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; /** @@ -103,6 +105,8 @@ public interface DeviceMapper extends BaseMapperX { List deviceLedgerList(); + List> selectWorkshopBatch(@Param("deviceIds") List deviceIds); + List lineDeviceList(@Param("pageReqVO") LineDeviceRequestVO pageReqVO); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/MqttDataHandler.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/MqttDataHandler.java index a465b6e2b..0fff28503 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/MqttDataHandler.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/consumer/MqttDataHandler.java @@ -381,9 +381,11 @@ public class MqttDataHandler extends SuperConsumer { deviceContactModelDO.setAddress(null); } String json = JSON.toJSONString(dataList); - boolean inserted = tDengineService.insertDeviceData(deviceId, json); +// boolean inserted = tDengineService.insertDeviceData(deviceId, json); - if (inserted) { + boolean isSuccess = tDengineService.newInsertDeviceData(deviceId, dataList); + + if (isSuccess) { log.info("设备 {} 数据入库成功,总数: {},有效: {}", deviceId, dataList.size(), successCount); } else { diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java index 22d67ecb3..6e37695d6 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java @@ -9,6 +9,7 @@ import cn.iocoder.yudao.framework.common.util.opc.OpcUtils; import cn.iocoder.yudao.module.iot.controller.admin.device.enums.DeviceStatusEnum; import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.utils.CronExpressionUtils; import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.scheduler.TaskSchedulerManager; +import cn.iocoder.yudao.module.iot.controller.admin.device.utils.DataTypeParseUtil; import cn.iocoder.yudao.module.iot.controller.admin.device.vo.*; import cn.iocoder.yudao.module.iot.controller.admin.devicecontactmodel.vo.DeviceContactModelPageReqVO; import cn.iocoder.yudao.module.iot.controller.admin.mqttdatarecord.vo.MqttDataRecordPageReqVO; @@ -60,6 +61,7 @@ import javax.validation.constraints.NotNull; import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import java.util.*; import java.util.stream.Collectors; @@ -158,9 +160,13 @@ public class DeviceServiceImpl implements DeviceService { //新增模板点位 LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); - lambdaQueryWrapper.eq(DeviceModelAttributeDO::getDeviceModelId,createReqVO.getDeviceModelId()); + lambdaQueryWrapper.eq(DeviceModelAttributeDO::getDeviceModelId,createReqVO.getDeviceModelId()).orderByDesc(DeviceModelAttributeDO::getId); List deviceModelAttributeDOS = deviceModelAttributeMapper.selectList(lambdaQueryWrapper); + if (deviceModelAttributeDOS.isEmpty()){ + throw exception(DEVICE_MODEL_ATTRIBUTE_NOT_EXISTS); + } + List contactModelList = new ArrayList<>(); for (DeviceModelAttributeDO attributeDO : deviceModelAttributeDOS) { DeviceContactModelDO contactModel = new DeviceContactModelDO(); @@ -228,11 +234,21 @@ public class DeviceServiceImpl implements DeviceService { public void updateDevice(DeviceSaveReqVO updateReqVO) { // 校验存在 validateDeviceExists(updateReqVO.getId()); + //校驗topic是否唯一 + validTopicExists(updateReqVO.getTopic()); + // 更新 DeviceDO updateObj = BeanUtils.toBean(updateReqVO, DeviceDO.class); deviceMapper.updateById(updateObj); } + private void validTopicExists(String topic) { + boolean exists = deviceMapper.exists(Wrappers.lambdaQuery().eq(DeviceDO::getTopic, topic)); + if (exists){ + throw exception(DEVICE_MQTT_TOPIC_ALREADY_EXIST); + } + } + @Override @Transactional(rollbackFor = Exception.class) public void deleteDevice(List ids) { @@ -305,7 +321,7 @@ public class DeviceServiceImpl implements DeviceService { .collect(Collectors.toList()); // 2. 批量获取 TDengine 最新 ts - Map latestTsMap = tdengineService.selectLatestTsBatch(deviceIds); + Map latestTsMap = tdengineService.newSelectLatestTsBatch(deviceIds); // 3. 批量获取最新的 DeviceOperationRecord List ruleCodes = Arrays.stream(DeviceStatusEnum.values()) @@ -378,18 +394,70 @@ public class DeviceServiceImpl implements DeviceService { PageResult deviceModelAttributeDOPageResult = deviceContactModelMapper.selectPageById(pageReqVO, deviceModelAttributePageReqVO); - Map> deviceDataMap = createDeviceDataMap(device.getId()); +// Map> deviceDataMap = createDeviceDataMap(device.getId()); + + //获取td数据库最新一条数据 + Map latestRow = tdengineService.newSelectLatestRow(device.getId()); + - // 合并数据:将 deviceDataMap 的值赋给分页结果中的对应记录 List records = deviceModelAttributeDOPageResult.getList(); - for (DeviceContactModelDO record : records) { - Map data = deviceDataMap.get(record.getId()); - if (data != null) { - record.setAddressValue(adjustByRatio(data.get("addressValue"), record.getRatio())); // 设置 addressValue - record.setLatestCollectionTime((String) data.get("timestamp")); // 设置 latestCollectionTime + + if (latestRow != null && !latestRow.isEmpty()) { + + // 1. 取 ts 时间 + Object tsObj = latestRow.get("ts"); + String latestTime = null; + + if (tsObj instanceof Timestamp) { + latestTime = ((Timestamp) tsObj).toLocalDateTime().toString(); + } else if (tsObj instanceof LocalDateTime) { + latestTime = tsObj.toString(); + } else if (tsObj != null) { + latestTime = tsObj.toString(); } + + + // 2. 遍历 records,匹配列名(attributeCode) + for (DeviceContactModelDO record : records) { + + String attributeCode = record.getAttributeCode(); + + if (attributeCode == null) { + continue; + } + + // 从 latestRow 获取对应列值 + Object rawValue = latestRow.get(attributeCode); + + if (rawValue == null) { + continue; + } + + // 根据 dataType 解析 + Object parsedValue = DataTypeParseUtil.parse(rawValue, record.getDataType()); + + // 按比例调整 + Object finalValue = adjustByRatio(parsedValue, record.getRatio()); + + // 设置值 + record.setAddressValue(finalValue); + + // 使用最新 ts 设置最新采集时间 + record.setLatestCollectionTime(latestTime); + } + } +// // 合并数据:将 deviceDataMap 的值赋给分页结果中的对应记录 +// List records = deviceModelAttributeDOPageResult.getList(); +// for (DeviceContactModelDO record : records) { +// Map data = deviceDataMap.get(record.getId()); +// if (data != null) { +// record.setAddressValue(adjustByRatio(data.get("addressValue"), record.getRatio())); // 设置 addressValue +// record.setLatestCollectionTime((String) data.get("timestamp")); // 设置 latestCollectionTime +// } +// } + return deviceModelAttributeDOPageResult; } @@ -655,40 +723,89 @@ public class DeviceServiceImpl implements DeviceService { @Override public PageResult lineDevicePage(LineDeviceRequestVO pageReqVO) { -// Page page = new Page<>(pageReqVO.getPageNo(), pageReqVO.getPageSize()); + // 1. 查询分页设备 + PageResult pageResult = + getDevicePage(BeanUtils.toBean(pageReqVO, DevicePageReqVO.class)); -// IPage lineDeviceRespVO = deviceMapper.lineDevicePage(page,pageReqVO); - PageResult pageResult = getDevicePage(BeanUtils.toBean(pageReqVO, DevicePageReqVO.class)); + List deviceList = + Optional.ofNullable(pageResult.getList()) + .orElse(Collections.emptyList()); - List list = new ArrayList<>(); - for (DeviceRespVO record : pageResult.getList()) { - LineDeviceRespVO lineDeviceRespVO = new LineDeviceRespVO(); - lineDeviceRespVO.setDeviceCode(record.getDeviceCode()); - lineDeviceRespVO.setDeviceName(record.getDeviceName()); - lineDeviceRespVO.setStatus(record.getStatus()); - lineDeviceRespVO.setCollectionTime(String.valueOf(record.getCollectionTime())); - lineDeviceRespVO.setId(record.getId()); - Map latestDeviceData = tdengineService.getLatestDeviceData(record.getId()); - if(latestDeviceData != null) { - lineDeviceRespVO.setCollectionTime((String) latestDeviceData.get("timestamp")); - } - String line = deviceMapper.lineDeviceLedgerPage(record.getId()); - if (line != null) { - lineDeviceRespVO.setLineName(line); - lineDeviceRespVO.setLineNode(line); - } else { - lineDeviceRespVO.setLineName("-"); - lineDeviceRespVO.setLineNode("-"); - } - list.add(lineDeviceRespVO); - } - if (list.isEmpty()) { - return PageResult.empty(); // 返回空Page + if (deviceList.isEmpty()) { + return PageResult.empty(); } - return new PageResult<>(list, pageResult.getTotal()); + + + // 2. 提取 deviceIds + List deviceIds = + deviceList.stream() + .map(DeviceRespVO::getId) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + +// // 批量查 workshop +// List> mapList = deviceMapper.selectWorkshopBatch(deviceIds); +// +// Map workshopMap = mapList.stream() +// .filter(m -> m.get("deviceId") != null) // 避免 deviceId null +// .collect(Collectors.toMap( +// m -> ((Number) m.get("deviceId")).longValue(), +// m -> m.get("workshop") != null ? m.get("workshop").toString() : "", +// (existing, replacement) -> replacement +// )); + + // 3. 批量查询最新采集时间(TDengine) + Map latestTsMap = + tdengineService.newSelectLatestTsBatch(deviceIds); + + // 4. 时间格式化器 + DateTimeFormatter formatter = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + // 5. 转换结果(使用 stream 更简洁) + List list = + deviceList.stream() + .map(device -> { + + LineDeviceRespVO vo = new LineDeviceRespVO(); + + vo.setId(device.getId()); + vo.setDeviceCode(device.getDeviceCode()); + vo.setDeviceName(device.getDeviceName()); + vo.setStatus(device.getStatus()); + + // 优先使用 TDengine 最新时间 + LocalDateTime latestTs = + latestTsMap.get(device.getId()); + + if (latestTs != null) { + vo.setCollectionTime(latestTs.format(formatter)); + } else if (device.getCollectionTime() != null) { + vo.setCollectionTime( + device.getCollectionTime() + .format(formatter) + ); + } else { + vo.setCollectionTime(null); + } + + // 查询产线名称 + vo.setLineName( + deviceMapper.lineDeviceLedgerPage(device.getId()) + ); +// vo.setLineName(workshopMap.get(device.getId())); + + + return vo; + }) + .collect(Collectors.toList()); + + // 6. 返回分页结果 + return new PageResult<>(list, pageResult.getTotal()); } + @Override public List lineDeviceList(LineDeviceRequestVO pageReqVO) { @@ -706,168 +823,234 @@ public class DeviceServiceImpl implements DeviceService { } @Override - public Map>> singleDevice(Long deviceId) throws JsonProcessingException { + public Map>> singleDevice(Long deviceId) { Map>> resultMap = new LinkedHashMap<>(); + List records = deviceContactModelMapper.selectList(Wrappers.lambdaQuery() + .eq(DeviceContactModelDO::getDeviceId,deviceId) + .orderByDesc(DeviceContactModelDO::getId)); + + if (records == null || records.isEmpty()) { + return resultMap; + } + try { - // 1. 获取设备数据列表 - List> deviceDataList = tdengineService.getNewestDeviceDataOrderByTimeDesc(deviceId); + // 获取最新一行数据 + Map latestRow = tdengineService.newSelectLatestRow(deviceId); + if (latestRow == null || latestRow.isEmpty()) { + return resultMap; + } + +// // 获取 ts 时间 +// Object tsObj = latestRow.get("ts"); +// String latestTime = tsObj instanceof Timestamp ? ((Timestamp) tsObj).toLocalDateTime().toString() +// : tsObj != null ? tsObj.toString() : null; - // 2. 获取属性类型映射 + // 属性类型映射 Map idToNameMap = deviceAttributeTypeMapper.selectList() .stream() .collect(Collectors.toMap( DeviceAttributeTypeDO::getId, DeviceAttributeTypeDO::getName )); + Map attributeTypeCache = new HashMap<>(); - // 3. 遍历并处理 - for (Map deviceData : deviceDataList) { - String queryDataJson = (String) deviceData.get("queryData"); + // 遍历 records 填充数据 + for (DeviceContactModelDO record : records) { - if (StringUtils.isNotBlank(queryDataJson)) { - // 使用TypeReference解析为List而不是具体的DO对象 - List> dataList = new ObjectMapper().readValue( - queryDataJson, - new TypeReference>>() {} - ); + String attributeCode = record.getAttributeCode(); + if (attributeCode == null) continue; - for (Map data : dataList) { - // 获取属性类型名称 - String attributeTypeName = "其他"; - String typeStr = (String) data.get("attributeType"); - if (typeStr != null) { - try { - attributeTypeName = typeStr; - } catch (Exception e) { - attributeTypeName = "未知"; - } - } + Object rawValue = latestRow.get(attributeCode); +// if (rawValue == null) continue; + + Object finalValue = DataTypeParseUtil.parse(rawValue, record.getDataType()); +// Object finalValue = adjustByRatio(finalValueparsedValue, record.getRatio()); - // 提取需要的字段 - Map simplifiedData = new HashMap<>(); - simplifiedData.put("addressValue", data.get("addressValue")); - simplifiedData.put("attributeName", data.get("attributeName")); + record.setAddressValue(finalValue); +// record.setLatestCollectionTime(latestTime); - resultMap - .computeIfAbsent(attributeTypeName, k -> new ArrayList<>()) - .add(simplifiedData); + // 属性类型名称 + String attributeTypeName = "其他"; + Object typeObj = record.getAttributeType(); + if (typeObj != null) { + String typeStr = typeObj.toString(); + try { + Long typeId = Long.parseLong(typeStr); + attributeTypeName = attributeTypeCache.computeIfAbsent(typeId, + k -> idToNameMap.getOrDefault(typeId, typeStr)); + } catch (NumberFormatException e) { + attributeTypeName = typeStr; } } + + Map simplifiedData = new HashMap<>(); + simplifiedData.put("addressValue", record.getAddressValue()); + simplifiedData.put("attributeName", record.getAttributeName()); + + resultMap.computeIfAbsent(attributeTypeName, k -> new ArrayList<>()).add(simplifiedData); } } catch (Exception e) { - System.out.println("处理设备数据时发生异常: " + e.getMessage()); + log.warn("处理设备 {} 最新数据时异常: {}", deviceId, e.getMessage()); } return resultMap; } + @Override - public PageResult> historyRecord(Long deviceId, String collectionStartTime, String collectionEndTime,Integer page, - Integer pageSize) { + public PageResult> historyRecord( + Long deviceId, + String collectionStartTime, + String collectionEndTime, + Integer page, + Integer pageSize) { + List> resultList = new ArrayList<>(); + if (deviceId == null) { - return null; + return PageResult.empty(); } try { - // 1. 获取设备数据列表 - List> deviceDataList = tdengineService.getstDeviceDataOrderByTimeDescPage( - deviceId, collectionStartTime, collectionEndTime, page, pageSize - ); - long total = tdengineService.queryDeviceDataTotal(deviceId, collectionStartTime, collectionEndTime); + // 1. 查询TDengine分页数据 + List> deviceDataList = + tdengineService.newSelectBatchPage( + deviceId, + collectionStartTime, + collectionEndTime, + page, + pageSize + ); + long total = + tdengineService.newSelectBatchTotal( + deviceId, + collectionStartTime, + collectionEndTime + ); if (deviceDataList.isEmpty()) { - return null; + return new PageResult<>(resultList, total); } - // 2. 获取属性类型映射 (ID -> Name) 并构建缓存,避免重复数据库查询 - Map idToNameMap = deviceAttributeTypeMapper.selectList() - .stream() - .collect(Collectors.toMap(DeviceAttributeTypeDO::getId, DeviceAttributeTypeDO::getName)); + // 2. 查询属性配置(完整字段来源) + List attributeList = + deviceContactModelMapper.selectList( + Wrappers.lambdaQuery(DeviceContactModelDO.class) + .eq(DeviceContactModelDO::getDeviceId, deviceId) + .orderByAsc(DeviceContactModelDO::getSort) + ); - // 额外缓存:数字ID -> 名称,防止同一个ID多次查询数据库 - Map attributeTypeCache = new HashMap<>(); + // 时间格式化器 + DateTimeFormatter formatter = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - ObjectMapper objectMapper = new ObjectMapper(); + // 3. 遍历TDengine数据 + for (Map row : deviceDataList) { - // 3. 遍历每条设备数据 - for (Map deviceData : deviceDataList) { - String queryDataJson = (String) deviceData.get("queryData"); - Timestamp timestamp = (Timestamp) deviceData.get("timestamp"); + Object tsObj = row.get("ts"); - if (StringUtils.isBlank(queryDataJson) || timestamp == null) { - continue; // 跳过无效数据 + if (tsObj == null) { + continue; } - List> dataList = objectMapper.readValue( - queryDataJson, new TypeReference>>() {} - ); + // 格式化时间 + String collectTime; - // 4. 按属性类型分组 - Map>> groupedData = new LinkedHashMap<>(); + if (tsObj instanceof LocalDateTime) { - for (Map data : dataList) { - String attributeTypeName = "其他"; - Object typeObj = data.get("attributeType"); + collectTime = + ((LocalDateTime) tsObj) + .format(formatter); - if (typeObj != null) { - String typeStr = typeObj.toString(); + } else if (tsObj instanceof Timestamp) { - try { - // 尝试当作数字 ID - Long typeId = Long.parseLong(typeStr); + collectTime = + ((Timestamp) tsObj) + .toLocalDateTime() + .format(formatter); - // 先从缓存查 - if (attributeTypeCache.containsKey(typeId)) { - attributeTypeName = attributeTypeCache.get(typeId); - } else { - // 查数据库 - String nameFromDb = idToNameMap.get(typeId); - if (nameFromDb != null) { - attributeTypeName = nameFromDb; - } else { - attributeTypeName = typeStr; // 查不到就用原值 - } - attributeTypeCache.put(typeId, attributeTypeName); - } - } catch (NumberFormatException e) { - // 不是数字,直接用原字符串 - attributeTypeName = typeStr; - } + } else { + + collectTime = tsObj.toString(); + } + + // 按 typeName 分组 + Map>> groupedData = + new LinkedHashMap<>(); + + // 核心逻辑:遍历 attributeList(保证全部字段返回) + for (DeviceContactModelDO attribute : attributeList) { + + String column = + attribute.getAttributeCode(); + + Object rawValue = + row.get(column); + + // 使用 typeName 分组(核心优化点) + String typeName = + attribute.getTypeName(); + + if (typeName == null || typeName.isEmpty()) { + typeName = "其他"; } - // 构建属性数据 Map - Map simplifiedData = new HashMap<>(); - simplifiedData.put("addressValue", data.get("addressValue")); - simplifiedData.put("attributeName", data.get("attributeName")); + Map simplifiedData = + new HashMap<>(); - groupedData.computeIfAbsent(attributeTypeName, k -> new ArrayList<>()) + simplifiedData.put( + "addressValue", + rawValue != null + ? rawValue.toString() + : null + ); + + simplifiedData.put( + "attributeName", + attribute.getAttributeName() + ); + + groupedData + .computeIfAbsent( + typeName, + k -> new ArrayList<>() + ) .add(simplifiedData); } - // 5. 构建时间点 Map - Map timePointData = new LinkedHashMap<>(groupedData); - timePointData.put("collectTime", timestamp.toString()); + // 构建最终结构(保持旧结构完全一致) + Map timePointData = + new LinkedHashMap<>(groupedData); + + timePointData.put( + "collectTime", + collectTime + ); resultList.add(timePointData); } + return new PageResult<>(resultList, total); } catch (Exception e) { - log.error("处理设备数据时发生异常", e); - return PageResult.empty(); + log.error("处理设备历史数据异常", e); + + return PageResult.empty(); } } + + + private void validateDeviceAttributeExists(Long id) { if (deviceAttributeMapper.selectById(id) == null) { throw exception(DEVICE_ATTRIBUTE_NOT_EXISTS); @@ -1282,13 +1465,13 @@ public class DeviceServiceImpl implements DeviceService { log.info("新增gateway订阅记录成功 topic={}", topic); } else { - + //更新gateway状态 gateway.setIsEnable(true); gateway.setUpdateTime(LocalDateTime.now()); gatewayMapper.updateById(gateway); - log.info("更新gateway启用状态 topic={}", topic); + } } @@ -1314,6 +1497,12 @@ public class DeviceServiceImpl implements DeviceService { ); log.info("gateway订阅记录已禁用 topic={}", topic); + + //更新设备运行状态为离线 + updateOperationalStatus(deviceDO); + log.info("更新设备运行状态为离线 deviceId={}", deviceDO.getId()); + + } } catch (Exception e) { diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/TDengineService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/TDengineService.java index 02893aa98..84e32f2d9 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/TDengineService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/TDengineService.java @@ -1,6 +1,8 @@ package cn.iocoder.yudao.module.iot.service.device; import cn.iocoder.yudao.framework.common.pojo.DeviceEdgeData; +import cn.iocoder.yudao.module.iot.controller.admin.device.enums.JavaToTdengineTypeEnum; +import cn.iocoder.yudao.module.iot.dal.dataobject.devicecontactmodel.DeviceContactModelDO; import com.alibaba.fastjson.JSON; import com.baomidou.dynamic.datasource.annotation.DS; import com.fasterxml.jackson.core.type.TypeReference; @@ -15,7 +17,9 @@ import org.springframework.jdbc.core.RowMapper; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import javax.validation.constraints.NotNull; import java.nio.charset.StandardCharsets; +import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; @@ -26,6 +30,10 @@ import java.time.format.DateTimeFormatter; import java.util.*; import java.util.stream.Collectors; +import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; +import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.COLOUMN_CREATION_FAILED; +import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.TABLE_CREATION_FAILED; + @Service @Slf4j public class TDengineService { @@ -137,7 +145,7 @@ public class TDengineService { jdbcTemplate.execute("USE besure"); String tableName = "d_" + id; - String sql = "INSERT INTO " + tableName + " (ts, query_data) VALUES (?, ?)"; + String sql = "INSERT INTO besure." + tableName + " (ts, query_data) VALUES (?, ?)"; return jdbcTemplate.update(sql, ps -> { ps.setTimestamp(1, timestamp); @@ -725,4 +733,451 @@ public class TDengineService { } + //=================================修改数据后新增方法=============================== + + + @DS("tdengine") + public void createTdengineTable(Long deviceId, List contactModelList) { + if (deviceId == null) { + return; + } + + // 1. 表名 + String tableName = "d_" + deviceId; + + // 2. 构建列SQL,TDengine必须有 ts + StringBuilder columnsSql = new StringBuilder("ts TIMESTAMP"); + + // 3. 遍历 contactModelList,如果为空则不执行循环,表只含 ts + if (contactModelList != null && !contactModelList.isEmpty()) { + for (DeviceContactModelDO contact : contactModelList) { + String attributeCode = contact.getAttributeCode(); + String dataType = contact.getDataType(); + + if (attributeCode == null || dataType == null) { + continue; + } + + // 使用枚举获取 TDengine 类型 + String tdType = JavaToTdengineTypeEnum.getTdTypeByJavaType(dataType); + if (tdType == null) { + tdType = "DOUBLE"; // 默认使用 DOUBLE + } + + // 拼接列 + columnsSql.append(", ").append(attributeCode).append(" ").append(tdType); + } + } + + // 4. 构建完整 SQL + String createSql = "CREATE TABLE IF NOT EXISTS besure_server." + tableName + " (" + + columnsSql.toString() + + ")"; + + // 5. 使用 JdbcTemplate 执行 SQL + try { + jdbcTemplate.execute(createSql); + log.info("TDengine 表创建成功: {}", tableName); + } catch (Exception e) { + log.error("TDengine 表创建失败: {}", tableName, e); + throw exception(TABLE_CREATION_FAILED); + } + } + + + /** + * 新增td数据列 + * @param deviceId + * @param deviceContactModel + */ + @DS("tdengine") + public void AddTDDatabaseColumn(Long deviceId, DeviceContactModelDO deviceContactModel) { + + if (deviceId == null || deviceContactModel == null) { + return; + } + + String attributeCode = deviceContactModel.getAttributeCode(); + String dataType = deviceContactModel.getDataType(); + + if (attributeCode == null || dataType == null) { + return; + } + + // 表名 + String tableName = "besure_server.d_" + deviceId; + + // Java类型 -> TDengine类型 + String tdType = JavaToTdengineTypeEnum.getTdTypeByJavaType(dataType); + if (tdType == null) { + tdType = "DOUBLE"; + } + + // ALTER TABLE SQL + String alterSql = "ALTER TABLE " + tableName + + " ADD COLUMN " + attributeCode + " " + tdType; + + try { + jdbcTemplate.execute(alterSql); + log.info("TDengine 表新增列成功: table={}, column={}, type={}", + tableName, attributeCode, tdType); + + } catch (Exception e) { + + // TDengine 如果列已存在会报错,这里可以忽略 +// if (e.getMessage() != null && e.getMessage().contains("duplicated column name")) { +// log.warn("列已存在,跳过: table={}, column={}", tableName, attributeCode); +// return; +// } + + log.error("TDengine 表新增列失败: table={}, column={}", tableName, attributeCode, e); + throw exception(COLOUMN_CREATION_FAILED); + } + } + + /** + * 插入数据 + * @param deviceId + * @param dataList + * @return + */ + @DS("tdengine") + public boolean newInsertDeviceData(Long deviceId, List dataList) { + + if (deviceId == null || dataList == null || dataList.isEmpty()) { + return false; + } + + // 表名 + String tableName = "besure_server.d_" + deviceId; + + // 列名构建 + StringBuilder columnBuilder = new StringBuilder("ts"); + + // 值占位符 + StringBuilder valueBuilder = new StringBuilder("?"); + + // 参数列表 + List params = new ArrayList<>(); + + // TDengine 必须字段 + params.add(new Timestamp(System.currentTimeMillis())); + + for (DeviceContactModelDO model : dataList) { + + if (model == null) { + continue; + } + + String column = model.getAttributeCode(); + Object valueObj = model.getAddressValue(); + String javaType = model.getDataType(); + + // 1. 基础校验 + if (column == null || column.trim().isEmpty()) { + continue; + } + + if (javaType == null || javaType.trim().isEmpty()) { + javaType = "double"; + } + + // 防止 SQL 注入(只允许字母数字下划线) + if (!column.matches("^[a-zA-Z0-9_]+$")) { + log.warn("非法列名,跳过: {}", column); + continue; + } + + try { + + // 2. 自动新增列 +// try { +// AddTDDatabaseColumn(deviceId, model); +// } catch (Exception ignored) { +// // 忽略已存在异常 +// } + + // 3. 值为空 -> 插入 NULL + Object convertedValue = null; + + if (valueObj != null) { + + String value = valueObj.toString(); + + if (!value.isEmpty()) { + + convertedValue = + JavaToTdengineTypeEnum.convertValue(javaType, value); + + } + } + + // 4. 拼接 SQL + columnBuilder.append(", ").append(column); + + valueBuilder.append(", ?"); + + params.add(convertedValue); + + } catch (Exception e) { + + log.error("TDengine 数据处理失败: column={}, value={}, type={}", + column, valueObj, javaType, e); + } + } + + // 没有有效字段不插入 + if (params.size() <= 1) { + + log.warn("TDengine 插入数据为空,跳过 deviceId={}", deviceId); + + return true; + } + + String sql = + "INSERT INTO " + + tableName + + " (" + columnBuilder + ") VALUES (" + valueBuilder + ")"; + + try { + + jdbcTemplate.update(sql, params.toArray()); + + log.info("TDengine 插入成功: table={}, columnCount={}", + tableName, params.size() - 1); + + + } catch (Exception e) { + + log.error("TDengine 插入失败: table={}, sql={}", tableName, sql, e); + return false; + } + + return true; + } + + + /** + * 根据deviceId批量查询最新时间 + * @param deviceIds + * @return + */ + @DS("tdengine") + public Map newSelectLatestTsBatch(List deviceIds) { + Map result = new HashMap<>(); + + for (Long deviceId : deviceIds) { + String tableName = "besure_server.d_" + deviceId; + String sql = "SELECT ts FROM " + tableName + " ORDER BY ts DESC LIMIT 1"; + + try { + LocalDateTime ts = jdbcTemplate.queryForObject(sql, (rs, rowNum) -> rs.getTimestamp("ts").toLocalDateTime()); + result.put(deviceId, ts); + } catch (Exception e) { + // 表不存在或者查询失败 + result.put(deviceId, null); + log.warn("设备: {}, 获取 ts 失败: {}", deviceId, e.getMessage()); + } + } + + return result; + } + + + /** + * 查询最新的数据 + * @param deviceId + * @return + */ + @DS("tdengine") + public Map newSelectLatestRow(Long deviceId) { + + if (deviceId == null) { + return null; + } + + String tableName = "besure_server.d_" + deviceId; + + String sql = "SELECT * FROM " + tableName + " ORDER BY ts DESC LIMIT 1"; + + try { + + List> list = jdbcTemplate.queryForList(sql); + + if (list == null || list.isEmpty()) { + return null; + } + + Map row = list.get(0); + + // 转换 ts 为 LocalDateTime(推荐) + Object tsObj = row.get("ts"); + if (tsObj instanceof Timestamp) { + row.put("ts", ((Timestamp) tsObj).toLocalDateTime()); + } + + log.debug("设备 {} 最新数据: {}", deviceId, row); + + return row; + + } catch (Exception e) { + + log.warn("设备 {} 查询最新数据失败: {}", deviceId, e.getMessage()); + + return null; + } + } + + + /** + * 查询指定设备在时间范围内的功率数据,并分页 + * + * @param deviceId 设备ID + * @param startTime 开始时间(字符串,格式如 "2026-02-09 00:00:00") + * @param endTime 结束时间 + * @param page 页码,从1开始 + * @param pageSize 每页条数 + * @return List> 每条记录包含 ts + 属性列 + */ + @DS("tdengine") + public List> newSelectBatchPage(Long deviceId, String startTime, String endTime, + Integer page, Integer pageSize) { + if (deviceId == null || page == null || pageSize == null) { + return Collections.emptyList(); + } + + String tableName = "besure_server.d_" + deviceId; + + // 计算分页偏移量 + int offset = (page - 1) * pageSize; + + // 构建 SQL + StringBuilder sqlBuilder = new StringBuilder(); + sqlBuilder.append("SELECT * FROM ").append(tableName) + .append(" WHERE 1=1 "); + + if (StringUtils.isNotBlank(startTime)) { + sqlBuilder.append(" AND ts >= '").append(startTime).append("' "); + } + if (StringUtils.isNotBlank(endTime)) { + sqlBuilder.append(" AND ts <= '").append(endTime).append("' "); + } + + sqlBuilder.append(" ORDER BY ts DESC ") + .append(" LIMIT ").append(pageSize) + .append(" OFFSET ").append(offset); + + String sql = sqlBuilder.toString(); + + try { + List> list = jdbcTemplate.queryForList(sql); + + // 如果 ts 需要转换为 LocalDateTime + for (Map row : list) { + Object tsObj = row.get("ts"); + if (tsObj instanceof Timestamp) { + row.put("ts", ((Timestamp) tsObj).toLocalDateTime()); + } + } + + return list; + + } catch (EmptyResultDataAccessException e) { + return Collections.emptyList(); + } catch (Exception e) { + log.error("查询设备 {} 功率数据异常", deviceId, e); + return Collections.emptyList(); + } + } + + /** + * 查询指定设备在时间范围内的数据总条数 + * + * @param deviceId 设备ID + * @param startTime 开始时间(可为空) + * @param endTime 结束时间(可为空) + * @return 总条数 + */ + @DS("tdengine") + public long newSelectBatchTotal(Long deviceId, String startTime, String endTime) { + if (deviceId == null) { + return 0; + } + + String tableName = "besure_server.d_" + deviceId; + + StringBuilder sqlBuilder = new StringBuilder(); + sqlBuilder.append("SELECT COUNT(*) AS total FROM ").append(tableName).append(" WHERE 1=1 "); + + if (StringUtils.isNotBlank(startTime)) { + sqlBuilder.append(" AND ts >= '").append(startTime).append("' "); + } + if (StringUtils.isNotBlank(endTime)) { + sqlBuilder.append(" AND ts <= '").append(endTime).append("' "); + } + + String sql = sqlBuilder.toString(); + + try { + // 查询单行结果 + Map result = jdbcTemplate.queryForMap(sql); + Object totalObj = result.get("total"); + + if (totalObj instanceof Number) { + return ((Number) totalObj).longValue(); + } else { + return 0; + } + } catch (EmptyResultDataAccessException e) { + return 0; + } catch (Exception e) { + log.error("查询设备 {} 数据总条数异常", deviceId, e); + return 0; + } + } + + + /** + * 查询指定设备在时间范围内的最新 N 条数据 + * + * @param deviceId 设备ID + * @param startTime 开始时间(可为空) + * @param endTime 结束时间(可为空) + * @param limit 查询条数 + * @return 数据列表,每条数据为 Map + */ + @DS("tdengine") + public List> newSelectLatestData(Long deviceId, String startTime, String endTime, int limit) { + if (deviceId == null || limit <= 0) { + return Collections.emptyList(); + } + + String tableName = "besure_server.d_" + deviceId; + + StringBuilder sqlBuilder = new StringBuilder(); + sqlBuilder.append("SELECT * FROM ").append(tableName).append(" WHERE 1=1 "); + + if (StringUtils.isNotBlank(startTime)) { + sqlBuilder.append(" AND ts >= '").append(startTime).append("' "); + } + if (StringUtils.isNotBlank(endTime)) { + sqlBuilder.append(" AND ts <= '").append(endTime).append("' "); + } + + sqlBuilder.append(" ORDER BY ts DESC "); // 按时间倒序 + sqlBuilder.append(" LIMIT ").append(limit); // 限制条数 + + String sql = sqlBuilder.toString(); + + try { + return jdbcTemplate.queryForList(sql); + } catch (EmptyResultDataAccessException e) { + return Collections.emptyList(); + } catch (Exception e) { + log.error("查询设备 {} 最新 {} 条数据异常", deviceId, limit, e); + return Collections.emptyList(); + } + } + + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/devicecontactmodel/DeviceContactModelServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/devicecontactmodel/DeviceContactModelServiceImpl.java index 0108420cb..f6b2f8d80 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/devicecontactmodel/DeviceContactModelServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/devicecontactmodel/DeviceContactModelServiceImpl.java @@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.service.devicecontactmodel; import cn.iocoder.yudao.module.iot.dal.dataobject.devicecontactmodel.DeviceContactModelDO; import cn.iocoder.yudao.module.iot.dal.dataobject.devicemodelattribute.DeviceModelAttributeDO; import cn.iocoder.yudao.module.iot.dal.mysql.deviceattributetype.DeviceAttributeTypeMapper; +import cn.iocoder.yudao.module.iot.service.device.TDengineService; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import org.springframework.stereotype.Service; import javax.annotation.Resource; @@ -34,6 +35,8 @@ public class DeviceContactModelServiceImpl implements DeviceContactModelService private DeviceContactModelMapper deviceContactModelMapper; @Resource private DeviceAttributeTypeMapper deviceAttributeTypeMapper; + @Resource + private TDengineService tDengineService; @Override public Long createDeviceContactModel(DeviceContactModelSaveReqVO createReqVO) { @@ -53,6 +56,10 @@ public class DeviceContactModelServiceImpl implements DeviceContactModelService DeviceContactModelDO deviceContactModel = BeanUtils.toBean(createReqVO, DeviceContactModelDO.class); // deviceContactModel.setTypeName(deviceAttributeTypeMapper.selectById(createReqVO.getAttributeCode()).getName()); deviceContactModelMapper.insert(deviceContactModel); + + //新增td数据库列 + tDengineService.AddTDDatabaseColumn(createReqVO.getDeviceId(),deviceContactModel); + // 返回 return deviceContactModel.getId(); } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/devicemodelattribute/DeviceModelAttributeServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/devicemodelattribute/DeviceModelAttributeServiceImpl.java index 5723261e3..9b80ac4e5 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/devicemodelattribute/DeviceModelAttributeServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/devicemodelattribute/DeviceModelAttributeServiceImpl.java @@ -161,19 +161,21 @@ import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.*; } @Override - public List> operationAnalysisDetails(Long deviceId, Long modelId,String collectionStartTime, String collectionEndTime) { - if(deviceId == null){ + public List> operationAnalysisDetails(Long deviceId, Long modelId, + String collectionStartTime, String collectionEndTime) { + if (deviceId == null) { throw exception(DEVICE_ID_DOES_NOT_EXIST); } - if(modelId == null){ + if (modelId == null) { throw exception(POINT_ID_MODEL_NOT_EXISTS); } List> resultList = new ArrayList<>(); try { - // 1. 获取最新10条设备数据列表 - List> deviceDataList = tdengineService.getstDeviceDataOrderByTimeDesc(deviceId,collectionStartTime,collectionEndTime,10); + // 1. 获取最新10条设备数据列表(直接从TDengine获取原始字段) + List> deviceDataList = + tdengineService.newSelectLatestData(deviceId, collectionStartTime, collectionEndTime, 10); // 2. 获取属性类型映射 Map idToNameMap = deviceAttributeTypeMapper.selectList() @@ -185,66 +187,42 @@ import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.*; // 3. 遍历每个时间点的数据 for (Map deviceData : deviceDataList) { - String queryDataJson = (String) deviceData.get("queryData"); - Timestamp timestamp = (Timestamp) deviceData.get("timestamp"); - SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd HH:mm:ss"); - String formattedTime = sdf.format(timestamp); // 例如 "26-01-06 14:30:45" - if (StringUtils.isNotBlank(queryDataJson) && timestamp != null) { - List> dataList = new ObjectMapper().readValue( - queryDataJson, - new TypeReference>>() {} - ); - - // 按属性类型分组 - Map>> groupedData = new LinkedHashMap<>(); - - for (Map data : dataList) { - if (((Integer) data.get("id")).longValue() == modelId){ + Timestamp timestamp = (Timestamp) deviceData.get("ts"); // TDengine 时间列 + if (timestamp == null) continue; + SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd HH:mm:ss"); + String formattedTime = sdf.format(timestamp); - String attributeTypeName = "其他"; - String typeStr = (String) data.get("attributeType"); - if (typeStr != null) { - try { - attributeTypeName = typeStr; - } catch (Exception e) { - attributeTypeName = "未知"; - } - } - - Map simplifiedData = new HashMap<>(); - simplifiedData.put("addressValue", data.get("addressValue")); - simplifiedData.put("attributeName", data.get("attributeName")); - - groupedData - .computeIfAbsent(attributeTypeName, k -> new ArrayList<>()) - .add(simplifiedData); - } - } + Map timePointData = new LinkedHashMap<>(); + timePointData.put("collectTime", formattedTime); - // 创建当前时间点的Map - Map timePointData = new LinkedHashMap<>(); + // 遍历所有字段(除 ts 之外) + Map>> groupedData = new LinkedHashMap<>(); + for (Map.Entry entry : deviceData.entrySet()) { + String key = entry.getKey(); + if ("ts".equalsIgnoreCase(key)) continue; // 跳过时间字段 - // 添加属性分组 - for (Map.Entry>> entry : groupedData.entrySet()) { - timePointData.put(entry.getKey(), entry.getValue()); - } + Map simplifiedData = new HashMap<>(); + simplifiedData.put("attributeName", key); + simplifiedData.put("addressValue", entry.getValue()); - // 添加收集时间 - timePointData.put("collectTime", formattedTime); + // 根据 modelId 或 attributeType 分类,这里暂时按 "其他" 分组 + groupedData.computeIfAbsent("其他", k -> new ArrayList<>()).add(simplifiedData); + } - resultList.add(timePointData); + // 添加属性分组 + for (Map.Entry>> entry : groupedData.entrySet()) { + timePointData.put(entry.getKey(), entry.getValue()); } + + resultList.add(timePointData); } } catch (Exception e) { - log.error(e.getMessage()); -// e.printStackTrace(); -// throw new RuntimeException("处理设备数据时发生异常", e); + log.error("处理设备 {} 数据时发生异常", deviceId, e); return new ArrayList<>(); } - return resultList; } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/resources/mapper/device/DeviceMapper.xml b/yudao-module-iot/yudao-module-iot-biz/src/main/resources/mapper/device/DeviceMapper.xml index c5baca5f4..33ad11d49 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/resources/mapper/device/DeviceMapper.xml +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/resources/mapper/device/DeviceMapper.xml @@ -146,4 +146,21 @@ where machine_id = #{deviceId} and deleted = 0 + + + \ No newline at end of file diff --git a/yudao-server/src/main/resources/application-dev.yaml b/yudao-server/src/main/resources/application-dev.yaml index 16718eb41..2a474db52 100644 --- a/yudao-server/src/main/resources/application-dev.yaml +++ b/yudao-server/src/main/resources/application-dev.yaml @@ -56,7 +56,7 @@ spring: tdengine: name: tdengine - url: jdbc:TAOS-RS://ngsk.tech:16042/besure?charset=UTF-8&locale=en_US.UTF-8 + url: jdbc:TAOS-RS://ngsk.tech:16042/besure_server?charset=UTF-8&locale=en_US.UTF-8 username: root password: taosdata driver-class-name: com.taosdata.jdbc.rs.RestfulDriver @@ -88,7 +88,7 @@ spring: # Quartz 配置项,对应 QuartzProperties 配置类 spring: quartz: - auto-startup: true # 本地开发环境,尽量不要开启 Job + auto-startup: false # 本地开发环境,尽量不要开启 Job scheduler-name: schedulerName # Scheduler 名字。默认为 schedulerName job-store-type: jdbc # Job 存储器类型。默认为 memory 表示内存,可选 jdbc 使用数据库。 wait-for-jobs-to-complete-on-shutdown: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true @@ -266,10 +266,10 @@ justauth: emqx: is-enable: true # 是否启用 MQTT - broker: tcp://192.168.5.119:1883 # EMQX 服务器地址(TCP 协议) + broker: tcp://47.106.185.127:1883 # EMQX 服务器地址(TCP 协议) client-id: mqtt-client-besure_server-dev # 客户端ID - user-name: ngsk # 用户名 - password: ngskcloud0809 # 密码 + user-name: admin # 用户名 + password: admin # 密码 clean-session: true # 是否清空 session reconnect: true # 是否自动断线重连 timeout: 30 # 连接超时时间(秒) diff --git a/yudao-server/src/main/resources/application-prod.yaml b/yudao-server/src/main/resources/application-prod.yaml index 43dbb1956..01b2991a2 100644 --- a/yudao-server/src/main/resources/application-prod.yaml +++ b/yudao-server/src/main/resources/application-prod.yaml @@ -56,7 +56,7 @@ spring: tdengine: name: tdengine - url: jdbc:TAOS-RS://192.168.21.2:6041/besure?charset=UTF-8&locale=en_US.UTF-8 + url: jdbc:TAOS-RS://192.168.21.2:6041/besure_server?charset=UTF-8&locale=en_US.UTF-8 username: root password: taosdata driver-class-name: com.taosdata.jdbc.rs.RestfulDriver @@ -77,11 +77,11 @@ spring: # Redis 配置。Redisson 默认的配置足够使用,一般不需要进行调优 redis: - host: 192.168.21.2 # 地址 + host: 47.106.185.127 # 地址 port: 6379 # 端口 database: 0 # 数据库索引 #password: bkcaydy8ydhZZnS2 # 密码,建议生产环境开启 - password: redis123 + password: BstPwd258 --- #################### 定时任务相关配置 #################### @@ -265,10 +265,10 @@ justauth: emqx: is-enable: true # 是否启用 MQTT - broker: tcp://192.168.21.2:1883 # EMQX 服务器地址(TCP 协议) - client-id: mqtt-client-besure-server-prod # 客户端ID + broker: tcp://47.106.185.127:1883 # EMQX 服务器地址(TCP 协议) + client-id: mqtt-client-besure_server-prod # 客户端ID user-name: admin # 用户名 - password: admin # 密码 + password: admin # 密码 clean-session: true # 是否清空 session reconnect: true # 是否自动断线重连 timeout: 30 # 连接超时时间(秒)