diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/core/Task.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/core/Task.java new file mode 100644 index 0000000000..b1ae69cb89 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/core/Task.java @@ -0,0 +1,14 @@ +package cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.core; + +public interface Task { + + /** + * 执行任务 + */ + void execute(Long taskId, String taskParam); + + /** + * 获取任务类型 + */ + String getTaskType(); +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/coretask/DeviceTask.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/coretask/DeviceTask.java index dfd1907f46..cc81695440 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/coretask/DeviceTask.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/coretask/DeviceTask.java @@ -1,39 +1,193 @@ +// DeviceTask.java - 原有设备任务 package cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.coretask; + +import cn.iocoder.yudao.framework.common.util.opc.OpcUtils; +import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.core.Task; +import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceDO; +import cn.iocoder.yudao.module.iot.dal.devicecontactmodel.DeviceContactModelDO; +import cn.iocoder.yudao.module.iot.dal.mysql.device.DeviceMapper; +import cn.iocoder.yudao.module.iot.dal.mysql.devicecontactmodel.DeviceContactModelMapper; +import cn.iocoder.yudao.module.iot.service.device.TDengineService; +import com.alibaba.fastjson.JSON; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import javax.annotation.Resource; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Date; +import java.util.List; @Component -public class DeviceTask { +@Slf4j +public class DeviceTask implements Task { private static final Logger logger = LoggerFactory.getLogger(DeviceTask.class); - /** - * 具体的设备执行逻辑 - * @param deviceId 设备ID - * @param deviceCode 设备编码 - */ - public void executeDeviceLogic(Long deviceId, String deviceCode) { + @Resource + private DeviceContactModelMapper deviceContactModelMapper; + + @Resource + private DeviceMapper deviceMapper; + + @Resource + private TDengineService tDengineService; + + @Override + public String getTaskType() { + return "DEVICE"; + } + + @Override + public void execute(Long taskId, String taskParam) { try { - // 创建时间格式化对象 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String currentTime = sdf.format(new Date()); - logger.info("执行设备任务,设备ID: {}, 设备编码: {}, 时间: {}", - deviceId, deviceCode, currentTime); + logger.info("执行设备任务,任务ID: {}, 参数: {}, 时间: {}", + taskId, taskParam, currentTime); + + // 解析参数,假设格式为 deviceId:deviceCode +// String[] params = taskParam.split(":"); +// if (params.length >= 2) { +// Long deviceId = Long.parseLong(params[0]); +// String deviceCode = params[1]; +// executeDeviceLogic(deviceId, deviceCode); +// } + executeDeviceLogic(taskId,taskParam); - // TODO: 这里编写具体的设备执行逻辑 - // 比如:读取设备数据、发送指令、处理响应等 } catch (Exception e) { - // 异常信息中也加入时间戳 - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - String errorTime = sdf.format(new Date()); + logger.error("执行设备任务异常,任务ID: {}", taskId, e); + } + } + + /** + * 具体的设备执行逻辑 + */ + private void executeDeviceLogic(Long deviceId, String param) { + logger.info("执行设备 {} 的具体逻辑,参数: {}", deviceId, param); + + // 1. 参数校验 + if (deviceId == null){ + logger.error("设备ID不能为空"); + throw new RuntimeException("设备ID不能为空"); + } + + // 2. 查询设备信息 + DeviceDO deviceDO = deviceMapper.selectById(deviceId); + if (deviceDO == null) { + logger.error("设备不存在,设备ID: {}", deviceId); + throw new RuntimeException("设备不存在,设备ID: " + deviceId); + } + + if (deviceDO.getUrl() == null || deviceDO.getUrl().trim().isEmpty()) { + logger.error("设备URL不能为空,设备ID: {}", deviceId); + throw new RuntimeException("设备URL不能为空"); + } + + // 3. 连接OPC服务器 + String username = deviceDO.getUsername() != null ? deviceDO.getUsername() : ""; + String password = deviceDO.getPassword() != null ? deviceDO.getPassword() : ""; - logger.error("执行设备任务异常,设备ID: {}, 异常时间: {}", - deviceId, errorTime, e); + boolean connected = OpcUtils.connect(deviceDO.getUrl(), username, password, 10); + if (!connected) { + logger.error("连接OPC服务器失败,设备ID: {},URL: {}", deviceId, deviceDO.getUrl()); + throw new RuntimeException("连接OPC服务器失败"); + } + + // 4. 查询设备点位配置 + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.eq(DeviceContactModelDO::getDeviceId, deviceId); + + List deviceContactModelDOS = deviceContactModelMapper.selectList(queryWrapper); + + // 5. 判断是否有点位数据 + if (deviceContactModelDOS == null || deviceContactModelDOS.isEmpty()) { + logger.warn("设备 {} 没有配置数据点位,跳过数据读取", deviceId); + return; + } + + logger.info("设备 {} 共有 {} 个点位需要读取", deviceId, deviceContactModelDOS.size()); + + // 6. 读取OPC数据 + int successCount = 0; + List validDataList = new ArrayList<>(); + + for (DeviceContactModelDO deviceContactModelDO : deviceContactModelDOS) { + try { + // 判断点位地址是否有效 + String address = deviceContactModelDO.getAddress(); + if (address == null || address.trim().isEmpty()) { + logger.warn("点位ID {} 的地址为空,跳过", deviceContactModelDO.getId()); + continue; + } + + // 读取OPC值 + Object addressValue = OpcUtils.readValue(address); + + if (addressValue == null) { + logger.warn("读取点位 {} 的值返回null,地址: {}", + deviceContactModelDO.getId(), address); + } else { + // 值验证 + if (addressValue instanceof String) { + String strValue = (String) addressValue; + if (strValue.trim().isEmpty()) { + logger.warn("读取点位 {} 的值为空字符串", deviceContactModelDO.getId()); + deviceContactModelDO.setAddressValue(""); + } else { + deviceContactModelDO.setAddressValue(addressValue); + successCount++; + } + } else { + deviceContactModelDO.setAddressValue(addressValue); + successCount++; + } + } + + validDataList.add(deviceContactModelDO); + + } catch (Exception e) { + logger.error("读取点位 {} 异常,地址: {}", + deviceContactModelDO.getId(), + deviceContactModelDO.getAddress(), e); + + } + } + + // 7. 判断是否有有效数据 + if (validDataList.isEmpty()) { + logger.warn("设备 {} 没有读取到任何有效数据,跳过入库", deviceId); + return; + } + + logger.info("设备 {} 成功读取 {} 个点位数据,总计 {} 个点位", + deviceId, successCount, validDataList.size()); + + try { + // 8. 数据入库 + String json = JSON.toJSONString(validDataList); + boolean insertSuccess = tDengineService.insertDeviceData(deviceId, json); + + if (insertSuccess) { + logger.info("设备 {} 数据成功插入TDengine,数据量: {}", deviceId, validDataList.size()); + + } else { + logger.error("设备 {} 数据插入TDengine失败", deviceId); + } + + } catch (Exception e) { + logger.error("设备 {} 数据入库异常", deviceId, e); + } finally { + // 9. 确保断开连接 + try { + OpcUtils.disconnect(); + } catch (Exception e) { + logger.error("断开OPC连接异常,设备ID: {}", deviceId, e); + } } } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/coretask/GenerateWorkOrderTask.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/coretask/GenerateWorkOrderTask.java new file mode 100644 index 0000000000..326afa6ae6 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/coretask/GenerateWorkOrderTask.java @@ -0,0 +1,49 @@ +// DataSyncTask.java - 数据同步任务 +package cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.coretask; + +import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.core.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.text.SimpleDateFormat; +import java.util.Date; + +@Component +public class GenerateWorkOrderTask implements Task { + private static final Logger logger = LoggerFactory.getLogger(GenerateWorkOrderTask.class); + + @Override + public String getTaskType() { + return "WORK_ORDER"; + } + + @Override + public void execute(Long taskId, String taskParam) { + try { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String currentTime = sdf.format(new Date()); + + logger.info("执行数据同步任务,任务ID: {}, 参数: {}, 时间: {}", + taskId, taskParam, currentTime); + + // 解析同步参数 + // 格式示例: "source:target" 或 "table:condition" + syncData(taskParam); + + } catch (Exception e) { + logger.error("执行数据同步任务异常,任务ID: {}", taskId, e); + } + } + + private void syncData(String param) { + // TODO: 数据同步逻辑 + logger.info("开始同步数据,参数: {}", param); + + // 示例: + // 1. 从源数据库读取数据 + // 2. 转换数据格式 + // 3. 写入目标数据库 + // 4. 记录同步日志 + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/scheduler/TaskSchedulerManager.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/scheduler/TaskSchedulerManager.java index c374cceb03..1a3edfba35 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/scheduler/TaskSchedulerManager.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/scheduler/TaskSchedulerManager.java @@ -1,142 +1,230 @@ // TaskSchedulerManager.java package cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.scheduler; -import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.coretask.DeviceTask; +import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.core.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.support.CronTrigger; +import org.springframework.scheduling.support.PeriodicTrigger; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import java.time.Duration; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; @Component public class TaskSchedulerManager { private static final Logger logger = LoggerFactory.getLogger(TaskSchedulerManager.class); - private TaskScheduler taskScheduler; // 移除 static - private DeviceTask deviceTask; // 移除 static - private final Map> taskMap = new ConcurrentHashMap<>(); + private final TaskScheduler taskScheduler; + + // 存储所有任务 + private final Map taskBeans = new ConcurrentHashMap<>(); + + // 存储正在运行的任务 + private final Map> runningTasks = new ConcurrentHashMap<>(); + private final Map taskInfos = new ConcurrentHashMap<>(); + + // 任务信息类 + private static class TaskInfo { + private Long taskId; + private String taskType; + private String taskParam; + private String cronExpression; + private String scheduleType; // "CRON" 或 "FIXED_RATE" 或 "FIXED_DELAY" + private Long initialDelay; + private Long period; + private TimeUnit timeUnit; + + // 构造器 + public TaskInfo(Long taskId, String taskType, String taskParam) { + this.taskId = taskId; + this.taskType = taskType; + this.taskParam = taskParam; + } + + // getters and setters... + } + + // 通过构造器注入 + public TaskSchedulerManager(TaskScheduler taskScheduler, + @Autowired(required = false) java.util.List tasks) { + this.taskScheduler = taskScheduler; + + // 注册所有任务 + if (tasks != null) { + for (Task task : tasks) { + taskBeans.put(task.getTaskType(), task); + logger.info("注册任务类型: {}", task.getTaskType()); + } + } + } /** - * 通过构造器注入 TaskScheduler - * Spring 会自动在上下文中查找 TaskScheduler Bean + * 启动 Cron 定时任务 */ - @Autowired - public TaskSchedulerManager(TaskScheduler taskScheduler) { - this.taskScheduler = taskScheduler; + public boolean startCronTask(Long taskId, String taskType, String taskParam, String cronExpression) { + return startTask(taskId, taskType, taskParam, "CRON", cronExpression, null, null, null); } /** - * 注入 DeviceTask + * 启动固定频率任务 */ - @Autowired - public void setDeviceTask(DeviceTask deviceTask) { - this.deviceTask = deviceTask; + public boolean startFixedRateTask(Long taskId, String taskType, String taskParam, + long initialDelay, long period, TimeUnit timeUnit) { + return startTask(taskId, taskType, taskParam, "FIXED_RATE", null, + initialDelay, period, timeUnit); } /** - * 如果 Spring 容器中没有 TaskScheduler Bean, - * 可以在这里创建一个默认的 + * 启动固定延迟任务 */ - @PostConstruct - public void init() { - if (taskScheduler == null) { - logger.warn("TaskScheduler not found in context, creating default one"); - ThreadPoolTaskScheduler defaultScheduler = new ThreadPoolTaskScheduler(); - defaultScheduler.setPoolSize(10); - defaultScheduler.setThreadNamePrefix("device-task-"); - defaultScheduler.initialize(); - this.taskScheduler = defaultScheduler; - } + public boolean startFixedDelayTask(Long taskId, String taskType, String taskParam, + long initialDelay, long period, TimeUnit timeUnit) { + return startTask(taskId, taskType, taskParam, "FIXED_DELAY", null, + initialDelay, period, timeUnit); } /** - * 启动设备定时任务 - * 移除 static 修饰符 + * 通用启动任务方法 */ - public boolean startDeviceTask(Long deviceId, String deviceCode, String cronExpression) { + private boolean startTask(Long taskId, String taskType, String taskParam, + String scheduleType, String cronExpression, + Long initialDelay, Long period, TimeUnit timeUnit) { try { - // 先停止已存在的任务 - stopDeviceTask(deviceId); + // 停止已存在的任务 + stopTask(taskId); - // 验证 DeviceTask - if (deviceTask == null) { - logger.error("DeviceTask is not initialized"); + // 获取任务实例 + Task task = taskBeans.get(taskType); + if (task == null) { + logger.error("任务类型不存在: {}", taskType); return false; } - // 创建新的定时任务 - ScheduledFuture future = taskScheduler.schedule( - () -> { - try { - deviceTask.executeDeviceLogic(deviceId, deviceCode); - } catch (Exception e) { - logger.error("Device task execution failed for deviceId: {}", deviceId, e); - } - }, - new CronTrigger(cronExpression) - ); - - taskMap.put(deviceId, future); - logger.info("启动设备定时任务成功,设备ID: {}, cron表达式: {}", deviceId, cronExpression); - return true; + // 创建任务信息 + TaskInfo taskInfo = new TaskInfo(taskId, taskType, taskParam); + taskInfo.scheduleType = scheduleType; + taskInfo.cronExpression = cronExpression; + taskInfo.initialDelay = initialDelay; + taskInfo.period = period; + taskInfo.timeUnit = timeUnit; + taskInfos.put(taskId, taskInfo); + + // 根据调度类型创建任务 + ScheduledFuture future = null; + switch (scheduleType) { + case "CRON": + future = taskScheduler.schedule( + () -> task.execute(taskId, taskParam), + new CronTrigger(cronExpression) + ); + break; + + case "FIXED_RATE": + if (initialDelay != null && period != null && timeUnit != null) { + future = taskScheduler.scheduleAtFixedRate( + () -> task.execute(taskId, taskParam), + initialDelay + ); + } + break; + + case "FIXED_DELAY": + if (initialDelay != null && period != null && timeUnit != null) { + future = taskScheduler.scheduleWithFixedDelay( + () -> task.execute(taskId, taskParam), + initialDelay + ); + } + break; + } + + if (future != null) { + runningTasks.put(taskId, future); + logger.info("启动{}任务成功,任务ID: {}, 类型: {}, 参数: {}", + scheduleType, taskId, taskType, taskParam); + return true; + } + + return false; } catch (Exception e) { - logger.error("启动设备定时任务失败,设备ID: {}", deviceId, e); + logger.error("启动任务失败,任务ID: {}", taskId, e); return false; } } /** - * 停止设备定时任务 - * 移除 static 修饰符 + * 停止任务 */ - public boolean stopDeviceTask(Long deviceId) { + public boolean stopTask(Long taskId) { try { - ScheduledFuture future = taskMap.get(deviceId); + ScheduledFuture future = runningTasks.get(taskId); if (future != null) { future.cancel(true); - taskMap.remove(deviceId); - logger.info("停止设备定时任务成功,设备ID: {}", deviceId); + runningTasks.remove(taskId); + taskInfos.remove(taskId); + logger.info("停止任务成功,任务ID: {}", taskId); return true; - } else { - logger.info("设备定时任务不存在,设备ID: {}", deviceId); - return false; } + return false; } catch (Exception e) { - logger.error("停止设备定时任务失败,设备ID: {}", deviceId, e); + logger.error("停止任务失败,任务ID: {}", taskId, e); return false; } } /** - * 检查设备定时任务是否在运行 + * 获取任务信息 */ - public boolean isTaskRunning(Long deviceId) { - ScheduledFuture future = taskMap.get(deviceId); + public TaskInfo getTaskInfo(Long taskId) { + return taskInfos.get(taskId); + } + + /** + * 检查任务是否运行 + */ + public boolean isTaskRunning(Long taskId) { + ScheduledFuture future = runningTasks.get(taskId); return future != null && !future.isCancelled() && !future.isDone(); } /** - * 获取任务数量 + * 获取所有任务类型 */ - public int getTaskCount() { - return taskMap.size(); + public Map getAllTaskTypes() { + return new ConcurrentHashMap<>(taskBeans); + } + + /** + * 获取运行中的任务数量 + */ + public int getRunningTaskCount() { + return runningTasks.size(); } /** * 停止所有任务 */ public void stopAllTasks() { - logger.info("停止所有定时任务,共 {} 个", taskMap.size()); - for (Long deviceId : taskMap.keySet()) { - stopDeviceTask(deviceId); + logger.info("停止所有定时任务,共 {} 个", runningTasks.size()); + for (Long taskId : runningTasks.keySet()) { + stopTask(taskId); } } + + /** + * 原设备任务方法(保持兼容性) + */ + public boolean startDeviceTask(Long deviceId, String deviceCode, String cronExpression) { + String taskParam = deviceId + ":" + deviceCode; + return startCronTask(deviceId, "DEVICE", taskParam, cronExpression); + } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java index fd6babce8e..84faedadf2 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java @@ -406,23 +406,13 @@ public class DeviceServiceImpl implements DeviceService { deviceDO.setStatus(String.valueOf(DeviceConnectionStatusEnum.CONNECTED.getStatus())); deviceMapper.updateById(deviceDO); + String cronExpression = CronExpressionUtils.secondsToCron(deviceDO.getSampleCycle()); - //查询存储 - LambdaQueryWrapper deviceModelAttributeLambdaQueryWrapper = new LambdaQueryWrapper<>(); - deviceModelAttributeLambdaQueryWrapper.eq(DeviceContactModelDO::getDeviceId,createReqVO.getId()); - List deviceContactModelDOS = deviceContactModelMapper.selectList(deviceModelAttributeLambdaQueryWrapper); - - - if (deviceContactModelDOS != null && deviceContactModelDOS.size() > 0){ - for (DeviceContactModelDO deviceContactModelDO : deviceContactModelDOS) { - Object addressValue = OpcUtils.readValue(deviceContactModelDO.getAddress() != null ? deviceContactModelDO.getAddress() : ""); - deviceContactModelDO.setAddressValue(addressValue); - } - String json = JSON.toJSONString(deviceContactModelDOS); - tdengineService.insertDeviceData(createReqVO.getId(),json); - } - - + taskSchedulerManager.startDeviceTask( + deviceDO.getId(), + deviceDO.getDeviceCode(), + cronExpression + ); }else { @@ -433,6 +423,7 @@ public class DeviceServiceImpl implements DeviceService { if (disconnect){ deviceDO.setStatus(String.valueOf(DeviceConnectionStatusEnum.DISCONNECTED.getStatus())); deviceMapper.updateById(deviceDO); + taskSchedulerManager.stopTask(deviceDO.getId()); }else { throw exception(OPC_CLOSE_CONNECT_FAILURE); } @@ -723,7 +714,7 @@ public class DeviceServiceImpl implements DeviceService { public Boolean scheduledStop(Long id) { try { // 1. 停止定时任务 - taskSchedulerManager.stopDeviceTask(id); + taskSchedulerManager.stopTask(id); return true;