diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/enums/TaskTypeEnum.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/enums/TaskTypeEnum.java new file mode 100644 index 0000000000..1642fddd2e --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/enums/TaskTypeEnum.java @@ -0,0 +1,60 @@ +// TaskTypeEnum.java +package cn.iocoder.yudao.module.iot.controller.admin.device.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@Getter +@AllArgsConstructor +public enum TaskTypeEnum { + + DEVICE("DEVICE", "设备数据采集"), + WORK_ORDER("WORK_ORDER", "工单生成"); + + /** + * 任务类型编码 + */ + private final String code; + + /** + * 任务类型名称 + */ + private final String name; + + /** + * 根据编码获取枚举 + */ + public static TaskTypeEnum getByCode(String code) { + for (TaskTypeEnum type : values()) { + if (type.getCode().equals(code)) { + return type; + } + } + return null; + } + + /** + * 检查编码是否存在 + */ + public static boolean contains(String code) { + return getByCode(code) != null; + } + + /** + * 生成任务ID + */ + public Long generateTaskId(Long baseId) { + if (baseId == null) { + baseId = 0L; + } + + switch (this) { + case DEVICE: + return 1000000L + baseId; + case WORK_ORDER: + return 2000000L + baseId; + default: + return 9000000L + baseId; + } + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/config/SchedulerConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/config/SchedulerConfig.java index b38dcfb592..89539d15fb 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/config/SchedulerConfig.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/config/SchedulerConfig.java @@ -13,11 +13,16 @@ public class SchedulerConfig { @Bean public TaskScheduler taskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); - scheduler.setPoolSize(20); // 设置线程池大小 - scheduler.setThreadNamePrefix("scheduled-task-"); // 线程名前缀 - scheduler.setAwaitTerminationSeconds(60); // 等待任务完成的秒数 - scheduler.setWaitForTasksToCompleteOnShutdown(true); // 关闭时等待任务完成 - scheduler.initialize(); // 初始化 + scheduler.setPoolSize(50); // 增加线程数 + scheduler.setThreadNamePrefix("scheduled-task-"); + scheduler.setAwaitTerminationSeconds(60); + scheduler.setWaitForTasksToCompleteOnShutdown(true); + + // 设置队列容量 + scheduler.setPoolSize(50); + scheduler.setThreadPriority(Thread.NORM_PRIORITY); + scheduler.setDaemon(false); + scheduler.initialize(); return scheduler; } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/core/Task.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/core/Task.java new file mode 100644 index 0000000000..b1ae69cb89 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/core/Task.java @@ -0,0 +1,14 @@ +package cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.core; + +public interface Task { + + /** + * 执行任务 + */ + void execute(Long taskId, String taskParam); + + /** + * 获取任务类型 + */ + String getTaskType(); +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/coretask/DeviceTask.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/coretask/DeviceTask.java index dfd1907f46..98a41f0e9d 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/coretask/DeviceTask.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/coretask/DeviceTask.java @@ -1,39 +1,196 @@ +// DeviceTask.java - 原有设备任务 package cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.coretask; + +import cn.iocoder.yudao.framework.common.util.opc.OpcUtils; +import cn.iocoder.yudao.module.iot.controller.admin.device.enums.TaskTypeEnum; +import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.core.Task; +import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceDO; +import cn.iocoder.yudao.module.iot.dal.devicecontactmodel.DeviceContactModelDO; +import cn.iocoder.yudao.module.iot.dal.mysql.device.DeviceMapper; +import cn.iocoder.yudao.module.iot.dal.mysql.devicecontactmodel.DeviceContactModelMapper; +import cn.iocoder.yudao.module.iot.service.device.TDengineService; +import com.alibaba.fastjson.JSON; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import javax.annotation.Resource; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Date; +import java.util.List; @Component -public class DeviceTask { +@Slf4j +public class DeviceTask implements Task { private static final Logger logger = LoggerFactory.getLogger(DeviceTask.class); - /** - * 具体的设备执行逻辑 - * @param deviceId 设备ID - * @param deviceCode 设备编码 - */ - public void executeDeviceLogic(Long deviceId, String deviceCode) { + @Resource + private DeviceContactModelMapper deviceContactModelMapper; + + @Resource + private DeviceMapper deviceMapper; + + @Resource + private TDengineService tDengineService; + + @Override + public String getTaskType() { + return TaskTypeEnum.DEVICE.getCode(); + } + + @Override + public void execute(Long taskId, String taskParam) { try { - // 创建时间格式化对象 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String currentTime = sdf.format(new Date()); - logger.info("执行设备任务,设备ID: {}, 设备编码: {}, 时间: {}", - deviceId, deviceCode, currentTime); + logger.info("执行设备任务,任务ID: {}, 参数: {}, 时间: {}", + taskId, taskParam, currentTime); + + // 解析参数,假设格式为 deviceId:deviceCode +// String[] params = taskParam.split(":"); +// if (params.length >= 2) { +// Long deviceId = Long.parseLong(params[0]); +// String deviceCode = params[1]; +// executeDeviceLogic(deviceId, deviceCode); +// } + executeDeviceLogic(taskId,taskParam); - // TODO: 这里编写具体的设备执行逻辑 - // 比如:读取设备数据、发送指令、处理响应等 } catch (Exception e) { - // 异常信息中也加入时间戳 - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - String errorTime = sdf.format(new Date()); + logger.error("执行设备任务异常,任务ID: {}", taskId, e); + } + } + + /** + * 具体的设备执行逻辑 + */ + private void executeDeviceLogic(Long sourceDeviceId, String param) { + logger.info("执行设备 {} 的具体逻辑,参数: {}", sourceDeviceId, param); + Long deviceId = sourceDeviceId - 1000000L; + logger.info("处理后id:{} ", deviceId ); + + // 1. 参数校验 + if (deviceId == null){ + logger.error("设备ID不能为空"); + throw new RuntimeException("设备ID不能为空"); + } + + // 2. 查询设备信息 + DeviceDO deviceDO = deviceMapper.selectById(deviceId); + if (deviceDO == null) { + logger.error("设备不存在,设备ID: {}", deviceId); + throw new RuntimeException("设备不存在,设备ID: " + deviceId); + } + + if (deviceDO.getUrl() == null || deviceDO.getUrl().trim().isEmpty()) { + logger.error("设备URL不能为空,设备ID: {}", deviceId); + throw new RuntimeException("设备URL不能为空"); + } + + // 3. 连接OPC服务器 + String username = deviceDO.getUsername() != null ? deviceDO.getUsername() : ""; + String password = deviceDO.getPassword() != null ? deviceDO.getPassword() : ""; - logger.error("执行设备任务异常,设备ID: {}, 异常时间: {}", - deviceId, errorTime, e); + boolean connected = OpcUtils.connect(deviceDO.getUrl(), username, password, 10); + if (!connected) { + logger.error("连接OPC服务器失败,设备ID: {},URL: {}", deviceId, deviceDO.getUrl()); + throw new RuntimeException("连接OPC服务器失败"); + } + + // 4. 查询设备点位配置 + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.eq(DeviceContactModelDO::getDeviceId, deviceId); + + List deviceContactModelDOS = deviceContactModelMapper.selectList(queryWrapper); + + // 5. 判断是否有点位数据 + if (deviceContactModelDOS == null || deviceContactModelDOS.isEmpty()) { + logger.warn("设备 {} 没有配置数据点位,跳过数据读取", deviceId); + return; + } + + logger.info("设备 {} 共有 {} 个点位需要读取", deviceId, deviceContactModelDOS.size()); + + // 6. 读取OPC数据 + int successCount = 0; + List validDataList = new ArrayList<>(); + + for (DeviceContactModelDO deviceContactModelDO : deviceContactModelDOS) { + try { + // 判断点位地址是否有效 + String address = deviceContactModelDO.getAddress(); + if (address == null || address.trim().isEmpty()) { + logger.warn("点位ID {} 的地址为空,跳过", deviceContactModelDO.getId()); + continue; + } + + // 读取OPC值 + Object addressValue = OpcUtils.readValue(address); + + if (addressValue == null) { + logger.warn("读取点位 {} 的值返回null,地址: {}", + deviceContactModelDO.getId(), address); + } else { + // 值验证 + if (addressValue instanceof String) { + String strValue = (String) addressValue; + if (strValue.trim().isEmpty()) { + logger.warn("读取点位 {} 的值为空字符串", deviceContactModelDO.getId()); + deviceContactModelDO.setAddressValue(""); + } else { + deviceContactModelDO.setAddressValue(addressValue); + successCount++; + } + } else { + deviceContactModelDO.setAddressValue(addressValue); + successCount++; + } + } + + validDataList.add(deviceContactModelDO); + + } catch (Exception e) { + logger.error("读取点位 {} 异常,地址: {}", + deviceContactModelDO.getId(), + deviceContactModelDO.getAddress(), e); + + } + } + + // 7. 判断是否有有效数据 + if (validDataList.isEmpty()) { + logger.warn("设备 {} 没有读取到任何有效数据,跳过入库", deviceId); + return; + } + + logger.info("设备 {} 成功读取 {} 个点位数据,总计 {} 个点位", + deviceId, successCount, validDataList.size()); + + try { + // 8. 数据入库 + String json = JSON.toJSONString(validDataList); + boolean insertSuccess = tDengineService.insertDeviceData(deviceId, json); + + if (insertSuccess) { + logger.info("设备 {} 数据成功插入TDengine,数据量: {}", deviceId, validDataList.size()); + + } else { + logger.error("设备 {} 数据插入TDengine失败", deviceId); + } + + } catch (Exception e) { + logger.error("设备 {} 数据入库异常", deviceId, e); + } finally { + // 9. 确保断开连接 + try { + OpcUtils.disconnect(); + } catch (Exception e) { + logger.error("断开OPC连接异常,设备ID: {}", deviceId, e); + } } } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/scheduler/TaskSchedulerManager.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/scheduler/TaskSchedulerManager.java index c374cceb03..239d0cb37f 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/scheduler/TaskSchedulerManager.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/scheduled/scheduler/TaskSchedulerManager.java @@ -1,142 +1,283 @@ // TaskSchedulerManager.java package cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.scheduler; -import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.coretask.DeviceTask; +import cn.iocoder.yudao.module.iot.controller.admin.device.enums.TaskTypeEnum; +import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.core.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.support.CronTrigger; +import org.springframework.scheduling.support.PeriodicTrigger; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import java.time.Duration; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; @Component public class TaskSchedulerManager { private static final Logger logger = LoggerFactory.getLogger(TaskSchedulerManager.class); - private TaskScheduler taskScheduler; // 移除 static - private DeviceTask deviceTask; // 移除 static - private final Map> taskMap = new ConcurrentHashMap<>(); + private final TaskScheduler taskScheduler; + + // 存储所有任务 + private final Map taskBeans = new ConcurrentHashMap<>(); + + // 存储正在运行的任务 + private final Map> runningTasks = new ConcurrentHashMap<>(); + private final Map taskInfos = new ConcurrentHashMap<>(); + + // 任务信息类 + private static class TaskInfo { + private Long taskId; + private String taskType; + private String taskParam; + private String cronExpression; + private String scheduleType; // "CRON" 或 "FIXED_RATE" 或 "FIXED_DELAY" + private Long initialDelay; + private Long period; + private TimeUnit timeUnit; + + // 构造器 + public TaskInfo(Long taskId, String taskType, String taskParam) { + this.taskId = taskId; + this.taskType = taskType; + this.taskParam = taskParam; + } + + // getters and setters... + } + + // 通过构造器注入 + public TaskSchedulerManager(TaskScheduler taskScheduler, + @Autowired(required = false) java.util.List tasks) { + this.taskScheduler = taskScheduler; + + // 注册所有任务 + if (tasks != null) { + for (Task task : tasks) { + taskBeans.put(task.getTaskType(), task); + logger.info("注册任务类型: {}", task.getTaskType()); + } + } + } /** - * 通过构造器注入 TaskScheduler - * Spring 会自动在上下文中查找 TaskScheduler Bean + * 启动 Cron 定时任务 */ - @Autowired - public TaskSchedulerManager(TaskScheduler taskScheduler) { - this.taskScheduler = taskScheduler; + public boolean startCronTask(Long taskId, String taskType, String taskParam, String cronExpression) { + return startTask(taskId, taskType, taskParam, "CRON", cronExpression, null, null, null); } /** - * 注入 DeviceTask + * 启动固定频率任务 */ - @Autowired - public void setDeviceTask(DeviceTask deviceTask) { - this.deviceTask = deviceTask; + public boolean startFixedRateTask(Long taskId, String taskType, String taskParam, + long initialDelay, long period, TimeUnit timeUnit) { + return startTask(taskId, taskType, taskParam, "FIXED_RATE", null, + initialDelay, period, timeUnit); } /** - * 如果 Spring 容器中没有 TaskScheduler Bean, - * 可以在这里创建一个默认的 + * 启动固定延迟任务 */ - @PostConstruct - public void init() { - if (taskScheduler == null) { - logger.warn("TaskScheduler not found in context, creating default one"); - ThreadPoolTaskScheduler defaultScheduler = new ThreadPoolTaskScheduler(); - defaultScheduler.setPoolSize(10); - defaultScheduler.setThreadNamePrefix("device-task-"); - defaultScheduler.initialize(); - this.taskScheduler = defaultScheduler; - } + public boolean startFixedDelayTask(Long taskId, String taskType, String taskParam, + long initialDelay, long period, TimeUnit timeUnit) { + return startTask(taskId, taskType, taskParam, "FIXED_DELAY", null, + initialDelay, period, timeUnit); } /** - * 启动设备定时任务 - * 移除 static 修饰符 + * 通用启动任务方法 */ - public boolean startDeviceTask(Long deviceId, String deviceCode, String cronExpression) { + private boolean startTask(Long taskId, String taskType, String taskParam, + String scheduleType, String cronExpression, + Long initialDelay, Long period, TimeUnit timeUnit) { try { - // 先停止已存在的任务 - stopDeviceTask(deviceId); + // 停止已存在的任务 + stopTask(taskId); - // 验证 DeviceTask - if (deviceTask == null) { - logger.error("DeviceTask is not initialized"); + // 获取任务实例 + Task task = taskBeans.get(taskType); + if (task == null) { + logger.error("任务类型不存在: {}", taskType); return false; } - // 创建新的定时任务 - ScheduledFuture future = taskScheduler.schedule( - () -> { - try { - deviceTask.executeDeviceLogic(deviceId, deviceCode); - } catch (Exception e) { - logger.error("Device task execution failed for deviceId: {}", deviceId, e); - } - }, - new CronTrigger(cronExpression) - ); - - taskMap.put(deviceId, future); - logger.info("启动设备定时任务成功,设备ID: {}, cron表达式: {}", deviceId, cronExpression); - return true; + // 创建任务信息 + TaskInfo taskInfo = new TaskInfo(taskId, taskType, taskParam); + taskInfo.scheduleType = scheduleType; + taskInfo.cronExpression = cronExpression; + taskInfo.initialDelay = initialDelay; + taskInfo.period = period; + taskInfo.timeUnit = timeUnit; + taskInfos.put(taskId, taskInfo); + + // 根据调度类型创建任务 + ScheduledFuture future = null; + switch (scheduleType) { + case "CRON": + future = taskScheduler.schedule( + () -> task.execute(taskId, taskParam), + new CronTrigger(cronExpression) + ); + break; + + case "FIXED_RATE": + if (initialDelay != null && period != null && timeUnit != null) { + future = taskScheduler.scheduleAtFixedRate( + () -> task.execute(taskId, taskParam), + initialDelay + ); + } + break; + + case "FIXED_DELAY": + if (initialDelay != null && period != null && timeUnit != null) { + future = taskScheduler.scheduleWithFixedDelay( + () -> task.execute(taskId, taskParam), + initialDelay + ); + } + break; + } + + if (future != null) { + runningTasks.put(taskId, future); + logger.info("启动{}任务成功,任务ID: {}, 类型: {}, 参数: {}", + scheduleType, taskId, taskType, taskParam); + return true; + } + + return false; } catch (Exception e) { - logger.error("启动设备定时任务失败,设备ID: {}", deviceId, e); + logger.error("启动任务失败,任务ID: {}", taskId, e); return false; } } /** - * 停止设备定时任务 - * 移除 static 修饰符 + * 停止任务 */ - public boolean stopDeviceTask(Long deviceId) { + public boolean stopTask(Long taskId) { try { - ScheduledFuture future = taskMap.get(deviceId); + ScheduledFuture future = runningTasks.get(taskId); if (future != null) { future.cancel(true); - taskMap.remove(deviceId); - logger.info("停止设备定时任务成功,设备ID: {}", deviceId); + runningTasks.remove(taskId); + taskInfos.remove(taskId); + logger.info("停止任务成功,任务ID: {}", taskId); return true; - } else { - logger.info("设备定时任务不存在,设备ID: {}", deviceId); - return false; } + return false; } catch (Exception e) { - logger.error("停止设备定时任务失败,设备ID: {}", deviceId, e); + logger.error("停止任务失败,任务ID: {}", taskId, e); return false; } } /** - * 检查设备定时任务是否在运行 + * 获取任务信息 + */ + public TaskInfo getTaskInfo(Long taskId) { + return taskInfos.get(taskId); + } + + /** + * 检查任务是否运行 */ - public boolean isTaskRunning(Long deviceId) { - ScheduledFuture future = taskMap.get(deviceId); + public boolean isTaskRunning(Long taskId) { + ScheduledFuture future = runningTasks.get(taskId); return future != null && !future.isCancelled() && !future.isDone(); } /** - * 获取任务数量 + * 获取所有任务类型 */ - public int getTaskCount() { - return taskMap.size(); + public Map getAllTaskTypes() { + return new ConcurrentHashMap<>(taskBeans); + } + + /** + * 获取运行中的任务数量 + */ + public int getRunningTaskCount() { + return runningTasks.size(); } /** * 停止所有任务 */ public void stopAllTasks() { - logger.info("停止所有定时任务,共 {} 个", taskMap.size()); - for (Long deviceId : taskMap.keySet()) { - stopDeviceTask(deviceId); + logger.info("停止所有定时任务,共 {} 个", runningTasks.size()); + for (Long taskId : runningTasks.keySet()) { + stopTask(taskId); + } + } + +// /** +// * 原设备任务方法(保持兼容性) +// */ +// public boolean startDeviceTask(Long deviceId, String deviceCode, String cronExpression) { +// String taskParam = deviceId + ":" + deviceCode; +// return startCronTask(deviceId, TaskTypeEnum.DEVICE.getCode(), taskParam, cronExpression); +// } + + /** + * 启动设备任务 + */ + public boolean startDeviceTask(Long deviceId, String cronExpression) { + Long taskId = TaskTypeEnum.DEVICE.generateTaskId(deviceId); + return startCronTask(taskId, TaskTypeEnum.DEVICE.getCode(), + "device:" + deviceId, cronExpression); + } + + /** + * 停止设备任务 + */ + public boolean stopDeviceTask(Long deviceId) { + if (deviceId == null) { + logger.warn("设备ID不能为空"); + return false; } + + // 生成与启动时相同的任务ID + Long taskId = TaskTypeEnum.DEVICE.generateTaskId(deviceId); + logger.info("停止设备任务,设备ID: {}, 生成的任务ID: {}", deviceId, taskId); + + return stopTask(taskId); + } + + + /** + * 启动工单任务 + */ + public boolean startWorkOrderTask(Long configId, String cronExpression) { + Long taskId = TaskTypeEnum.WORK_ORDER.generateTaskId(configId); + return startCronTask(taskId, TaskTypeEnum.WORK_ORDER.getCode(), + "workOrder:" + configId, cronExpression); + } + + + /** + * 停止工单任务 + */ + public boolean stopWorkOrderTask(Long configId) { + if (configId == null) { + logger.warn("工单配置ID不能为空"); + return false; + } + + // 生成与启动时相同的任务ID + Long taskId = TaskTypeEnum.WORK_ORDER.generateTaskId(configId); + logger.info("停止工单任务,配置ID: {}, 生成的任务ID: {}", configId, taskId); + + return stopTask(taskId); } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java index fd6babce8e..16b5658ac5 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java @@ -406,23 +406,12 @@ public class DeviceServiceImpl implements DeviceService { deviceDO.setStatus(String.valueOf(DeviceConnectionStatusEnum.CONNECTED.getStatus())); deviceMapper.updateById(deviceDO); + String cronExpression = CronExpressionUtils.secondsToCron(deviceDO.getSampleCycle()); - //查询存储 - LambdaQueryWrapper deviceModelAttributeLambdaQueryWrapper = new LambdaQueryWrapper<>(); - deviceModelAttributeLambdaQueryWrapper.eq(DeviceContactModelDO::getDeviceId,createReqVO.getId()); - List deviceContactModelDOS = deviceContactModelMapper.selectList(deviceModelAttributeLambdaQueryWrapper); - - - if (deviceContactModelDOS != null && deviceContactModelDOS.size() > 0){ - for (DeviceContactModelDO deviceContactModelDO : deviceContactModelDOS) { - Object addressValue = OpcUtils.readValue(deviceContactModelDO.getAddress() != null ? deviceContactModelDO.getAddress() : ""); - deviceContactModelDO.setAddressValue(addressValue); - } - String json = JSON.toJSONString(deviceContactModelDOS); - tdengineService.insertDeviceData(createReqVO.getId(),json); - } - - + taskSchedulerManager.startDeviceTask( + deviceDO.getId(), + cronExpression + ); }else { @@ -433,6 +422,7 @@ public class DeviceServiceImpl implements DeviceService { if (disconnect){ deviceDO.setStatus(String.valueOf(DeviceConnectionStatusEnum.DISCONNECTED.getStatus())); deviceMapper.updateById(deviceDO); + taskSchedulerManager.stopDeviceTask(deviceDO.getId()); }else { throw exception(OPC_CLOSE_CONNECT_FAILURE); } @@ -440,15 +430,6 @@ public class DeviceServiceImpl implements DeviceService { throw exception(OPC_PARAMETER_DOES_NOT_EXIST); } - - - - - - - - - return Boolean.TRUE; } @@ -704,7 +685,6 @@ public class DeviceServiceImpl implements DeviceService { // 2. 启动定时任务 boolean success = taskSchedulerManager.startDeviceTask( deviceDO.getId(), - deviceDO.getDeviceCode(), cronExpression ); diff --git a/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/controller/admin/taskmanagement/TaskManagementController.java b/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/controller/admin/taskmanagement/TaskManagementController.java index 3181e4e5b8..e8b36d2bd4 100644 --- a/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/controller/admin/taskmanagement/TaskManagementController.java +++ b/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/controller/admin/taskmanagement/TaskManagementController.java @@ -121,15 +121,23 @@ public class TaskManagementController { return success(true); } + @PutMapping("/update-enabled") + @Operation(summary = "更新任务管理启用状态") + @PreAuthorize("@ss.hasPermission('mes:task-management:update')") + public CommonResult updateTaskManagementEnabled(@Valid @RequestBody TaskManagementUpdateEnabledReqVO updateEnabledReqVO) { + taskManagementService.updateTaskManagementEnabled(updateEnabledReqVO); + return success(true); + } + + private PageResult buildPageCreatorName(PageResult planMaintenanceRespVOPageResult) { for (TaskManagementRespVO planMaintenanceRespVO : planMaintenanceRespVOPageResult.getList()) { AdminUserRespDTO user = adminUserApi.getUser(Long.valueOf(planMaintenanceRespVO.getCreator())); - planMaintenanceRespVO.setCreatorName( "(" + user.getUsername()+ ")" + user.getNickname()); - + if (user!=null){ + planMaintenanceRespVO.setCreatorName( "(" + user.getUsername()+ ")" + user.getNickname()); + } } - - return planMaintenanceRespVOPageResult; } diff --git a/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/controller/admin/taskmanagement/scheduled/coretask/GenerateWorkOrderTask.java b/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/controller/admin/taskmanagement/scheduled/coretask/GenerateWorkOrderTask.java new file mode 100644 index 0000000000..0db8c6c2d1 --- /dev/null +++ b/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/controller/admin/taskmanagement/scheduled/coretask/GenerateWorkOrderTask.java @@ -0,0 +1,143 @@ +// DataSyncTask.java - 数据同步任务 +package cn.iocoder.yudao.module.mes.controller.admin.taskmanagement.scheduled.coretask; + +import cn.iocoder.yudao.module.iot.controller.admin.device.enums.TaskTypeEnum; +import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.core.Task; +import cn.iocoder.yudao.module.iot.service.device.TDengineService; +import cn.iocoder.yudao.module.mes.dal.dataobject.deviceledger.DeviceLedgerDO; +import cn.iocoder.yudao.module.mes.dal.dataobject.dvsubject.DvSubjectDO; +import cn.iocoder.yudao.module.mes.dal.dataobject.subjectplan.SubjectPlanDO; +import cn.iocoder.yudao.module.mes.dal.dataobject.taskmanagement.TaskManagementDO; +import cn.iocoder.yudao.module.mes.dal.dataobject.ticketmanagement.TicketManagementDO; +import cn.iocoder.yudao.module.mes.dal.dataobject.ticketresults.TicketResultsDO; +import cn.iocoder.yudao.module.mes.dal.mysql.deviceledger.DeviceLedgerMapper; +import cn.iocoder.yudao.module.mes.dal.mysql.dvsubject.DvSubjectMapper; +import cn.iocoder.yudao.module.mes.dal.mysql.subjectplan.SubjectPlanMapper; +import cn.iocoder.yudao.module.mes.dal.mysql.taskmanagement.TaskManagementMapper; +import cn.iocoder.yudao.module.mes.dal.mysql.ticketmanagement.TicketManagementMapper; +import cn.iocoder.yudao.module.mes.dal.mysql.ticketresults.TicketResultsMapper; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; + +import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; + +@Component +public class GenerateWorkOrderTask implements Task { + private static final Logger logger = LoggerFactory.getLogger(GenerateWorkOrderTask.class); + + @Resource + private TaskManagementMapper taskManagementMapper; + + @Resource + private DeviceLedgerMapper deviceLedgerMapper; + + @Resource + private TicketManagementMapper ticketManagementMapper; + + @Resource + private SubjectPlanMapper subjectPlanMapper; + + @Resource + private DvSubjectMapper dvSubjectMapper; + + @Resource + private TicketResultsMapper ticketResultsMapper; + + + @Override + public String getTaskType() { + return TaskTypeEnum.WORK_ORDER.getCode(); + } + + @Override + public void execute(Long taskId, String taskParam) { + try { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String currentTime = sdf.format(new Date()); + + logger.info("执行数据同步任务,任务ID: {}, 参数: {}, 时间: {}", + taskId, taskParam, currentTime); + + // 解析同步参数 + // 格式示例: "source:target" 或 "table:condition" + generateWorkOrder(taskId); + + } catch (Exception e) { + logger.error("执行数据同步任务异常,任务ID: {}", taskId, e); + } + } + + private void generateWorkOrder(Long taskId) { + + logger.info("开始同步数据,id: {}", taskId); + Long id = taskId -2000000L; + logger.info("处理后id:{} ", id ); + + //检验数据是否存在 + taskManagementMapper.selectById(id); + TaskManagementDO taskManagementDO = taskManagementMapper.selectById(id); + if (taskManagementDO == null){ + return; + } + + // 将逗号分隔的字符串转换为Long类型的List + List idList = Arrays.stream(taskManagementDO.getDeviceList().split(",")) + .map(String::trim) // 去除可能存在的空格 + .map(Long::valueOf) + .collect(Collectors.toList()); + + for (Long deviceId : idList) { + TicketManagementDO ticketManagementDO = new TicketManagementDO(); + DeviceLedgerDO deviceLedgerDO = deviceLedgerMapper.selectById(deviceId); + ticketManagementDO.setTaskId(taskManagementDO.getId()); + ticketManagementDO.setPlanNo(generatePrefixedOrderNo()); + ticketManagementDO.setPlanId(taskManagementDO.getProjectForm()); + ticketManagementDO.setDeviceName(deviceLedgerDO.getDeviceName()); + ticketManagementDO.setPlanType(taskManagementDO.getTaskType()); + ticketManagementDO.setConfigName(taskManagementDO.getName()); + ticketManagementDO.setTaskEndTime(taskManagementDO.getEndDate().atStartOfDay()); + // TODO 默认为内置管理员Id + ticketManagementDO.setCreator("1"); + ticketManagementDO.setUpdater("1"); + ticketManagementMapper.insert(ticketManagementDO); + + List subjectPlanDOList = subjectPlanMapper.selectList(Wrappers.lambdaQuery().eq(SubjectPlanDO::getPlanId, ticketManagementDO.getPlanId())); + for (SubjectPlanDO subjectPlanDO : subjectPlanDOList) { + DvSubjectDO dvSubjectDO = dvSubjectMapper.selectById(subjectPlanDO.getSubjectId()); + + TicketResultsDO ticketResultsDO = new TicketResultsDO(); + ticketResultsDO.setInspectionItemName(dvSubjectDO.getSubjectName()); + ticketResultsDO.setInspectionMethod(dvSubjectDO.getInspectionMethod()); + ticketResultsDO.setJudgmentCriteria(dvSubjectDO.getJudgmentCriteria()); + ticketResultsDO.setManagementId(ticketManagementDO.getId()); + ticketResultsDO.setDeviceId(deviceId); + // TODO 默认为内置管理员Id + ticketResultsDO.setCreator("1"); + ticketResultsDO.setUpdater("1"); + ticketResultsMapper.insert(ticketResultsDO); + } + + } + } + + /** + * 带前缀的时间戳单号 + */ + public static String generatePrefixedOrderNo() { + SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd"); + String date = sdf.format(new Date()); + String randomNum = String.format("%06d", new Random().nextInt(1000000)); + return "E" + date + randomNum; + } + +} \ No newline at end of file diff --git a/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/controller/admin/taskmanagement/vo/TaskManagementUpdateEnabledReqVO.java b/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/controller/admin/taskmanagement/vo/TaskManagementUpdateEnabledReqVO.java new file mode 100644 index 0000000000..f967ba78be --- /dev/null +++ b/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/controller/admin/taskmanagement/vo/TaskManagementUpdateEnabledReqVO.java @@ -0,0 +1,19 @@ +package cn.iocoder.yudao.module.mes.controller.admin.taskmanagement.vo; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +import javax.validation.constraints.NotNull; + +@Schema(description = "任务管理 - 更新启用状态 Request VO") +@Data +public class TaskManagementUpdateEnabledReqVO { + + @Schema(description = "任务ID", required = true, example = "1024") + @NotNull(message = "任务ID不能为空") + private Long id; + + @Schema(description = "是否启用", required = true, example = "true") + @NotNull(message = "启用状态不能为空") + private Boolean enabled; +} \ No newline at end of file diff --git a/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/controller/admin/ticketmanagement/TicketManagementController.java b/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/controller/admin/ticketmanagement/TicketManagementController.java index b2e90bbc86..f4600fe462 100644 --- a/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/controller/admin/ticketmanagement/TicketManagementController.java +++ b/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/controller/admin/ticketmanagement/TicketManagementController.java @@ -115,8 +115,10 @@ public class TicketManagementController { private PageResult buildPageCreatorName(PageResult ticketManagementRespVOPageResult) { for (TicketManagementRespVO ticketManagementRespVO : ticketManagementRespVOPageResult.getList()) { - AdminUserRespDTO user = adminUserApi.getUser(Long.valueOf(ticketManagementRespVO.getCreator())); - ticketManagementRespVO.setCreatorName( "(" + user.getUsername()+ ")" + user.getNickname()); + if (ticketManagementRespVO.getCreator()!=null){ + AdminUserRespDTO user = adminUserApi.getUser(Long.valueOf(ticketManagementRespVO.getCreator())); + ticketManagementRespVO.setCreatorName( "(" + user.getUsername()+ ")" + user.getNickname()); + } if (ticketManagementRespVO.getOperator()!=null){ AdminUserRespDTO operator = adminUserApi.getUser(Long.valueOf(ticketManagementRespVO.getOperator())); ticketManagementRespVO.setOperatorName("(" + operator.getUsername()+ ")" + operator.getNickname()); diff --git a/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/dal/mysql/ticketmanagement/TicketManagementMapper.java b/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/dal/mysql/ticketmanagement/TicketManagementMapper.java index 57785a8c07..03003327d3 100644 --- a/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/dal/mysql/ticketmanagement/TicketManagementMapper.java +++ b/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/dal/mysql/ticketmanagement/TicketManagementMapper.java @@ -33,7 +33,7 @@ public interface TicketManagementMapper extends BaseMapperX .likeIfPresent(TicketManagementDO::getConfigName, reqVO.getConfigName()) .eqIfPresent(TicketManagementDO::getJobStatus, reqVO.getJobStatus()) .eqIfPresent(TicketManagementDO::getJobResult, reqVO.getJobResult()) - .orderByDesc(TicketManagementDO::getCreateTime); + .orderByDesc(TicketManagementDO::getId); // 单独处理 ids 条件 diff --git a/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/service/taskmanagement/TaskManagementService.java b/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/service/taskmanagement/TaskManagementService.java index 618c9cc901..9f73dc2af1 100644 --- a/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/service/taskmanagement/TaskManagementService.java +++ b/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/service/taskmanagement/TaskManagementService.java @@ -53,4 +53,6 @@ public interface TaskManagementService { PageResult getTaskManagementPage(TaskManagementPageReqVO pageReqVO); void createTicket(Long id); + + void updateTaskManagementEnabled(TaskManagementUpdateEnabledReqVO updateEnabledReqVO); } \ No newline at end of file diff --git a/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/service/taskmanagement/TaskManagementServiceImpl.java b/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/service/taskmanagement/TaskManagementServiceImpl.java index e966396c47..3277c8183d 100644 --- a/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/service/taskmanagement/TaskManagementServiceImpl.java +++ b/yudao-module-mes/yudao-module-mes-biz/src/main/java/cn/iocoder/yudao/module/mes/service/taskmanagement/TaskManagementServiceImpl.java @@ -1,5 +1,7 @@ package cn.iocoder.yudao.module.mes.service.taskmanagement; +import cn.iocoder.yudao.module.iot.controller.admin.device.enums.TaskTypeEnum; +import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.scheduler.TaskSchedulerManager; import cn.iocoder.yudao.module.mes.dal.dataobject.deviceledger.DeviceLedgerDO; import cn.iocoder.yudao.module.mes.dal.dataobject.dvsubject.DvSubjectDO; import cn.iocoder.yudao.module.mes.dal.dataobject.planmaintenance.PlanMaintenanceDO; @@ -12,7 +14,9 @@ import cn.iocoder.yudao.module.mes.dal.mysql.planmaintenance.PlanMaintenanceMapp import cn.iocoder.yudao.module.mes.dal.mysql.subjectplan.SubjectPlanMapper; import cn.iocoder.yudao.module.mes.dal.mysql.ticketmanagement.TicketManagementMapper; import cn.iocoder.yudao.module.mes.dal.mysql.ticketresults.TicketResultsMapper; +import com.baomidou.mybatisplus.core.toolkit.StringUtils; import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import javax.annotation.Resource; import org.springframework.validation.annotation.Validated; @@ -40,6 +44,7 @@ import static cn.iocoder.yudao.module.mes.enums.ErrorCodeConstants.*; */ @Service @Validated +@Slf4j public class TaskManagementServiceImpl implements TaskManagementService { @Resource @@ -60,10 +65,12 @@ public class TaskManagementServiceImpl implements TaskManagementService { @Resource private SubjectPlanMapper subjectPlanMapper; - @Resource private DvSubjectMapper dvSubjectMapper; + @Resource + private TaskSchedulerManager taskSchedulerManager; + @Override public Long createTaskManagement(TaskManagementSaveReqVO createReqVO) { // 插入 @@ -121,8 +128,6 @@ public class TaskManagementServiceImpl implements TaskManagementService { @Override public void createTicket(Long id) { - List ticketManagementDOS = new ArrayList<>(); - //检验数据是否存在 validateTaskManagementExists(id); TaskManagementDO taskManagementDO = taskManagementMapper.selectById(id); @@ -136,7 +141,6 @@ public class TaskManagementServiceImpl implements TaskManagementService { .map(Long::valueOf) .collect(Collectors.toList()); - for (Long deviceId : idList) { TicketManagementDO ticketManagementDO = new TicketManagementDO(); DeviceLedgerDO deviceLedgerDO = deviceLedgerMapper.selectById(deviceId); @@ -149,7 +153,6 @@ public class TaskManagementServiceImpl implements TaskManagementService { ticketManagementDO.setTaskEndTime(taskManagementDO.getEndDate().atStartOfDay()); ticketManagementMapper.insert(ticketManagementDO); - List dvSubjectDOList = new ArrayList<>(); List subjectPlanDOList = subjectPlanMapper.selectList(Wrappers.lambdaQuery().eq(SubjectPlanDO::getPlanId, ticketManagementDO.getPlanId())); for (SubjectPlanDO subjectPlanDO : subjectPlanDOList) { DvSubjectDO dvSubjectDO = dvSubjectMapper.selectById(subjectPlanDO.getSubjectId()); @@ -167,6 +170,95 @@ public class TaskManagementServiceImpl implements TaskManagementService { } + @Override + public void updateTaskManagementEnabled(TaskManagementUpdateEnabledReqVO updateEnabledReqVO) { + // 1. 校验任务是否存在 + TaskManagementDO task = taskManagementMapper.selectById(updateEnabledReqVO.getId()); + if (task == null) { + throw exception(TASK_MANAGEMENT_NOT_EXISTS); + } + + // 2. 如果状态没有变化,直接返回 + if (Objects.equals(task.getEnabled(), updateEnabledReqVO.getEnabled())) { + return; + } + + // 3. 执行状态更新操作 + executeEnableUpdate(task, updateEnabledReqVO.getEnabled()); + + } + + /** + * 执行启用/禁用操作 + */ + private void executeEnableUpdate(TaskManagementDO task, Boolean enabled) { + // 更新数据库状态 + updateTaskStatusInDB(task.getId(), enabled); + + // 处理定时任务 + if (enabled) { + enableTaskSchedule(task); + } else { + disableTaskSchedule(task); + } + } + + + /** + * 启用任务调度 + */ + private void enableTaskSchedule(TaskManagementDO task) { + // 验证Cron表达式 + if (StringUtils.isBlank(task.getCronExpression())) { + log.warn("任务{}没有配置Cron表达式,无法启用调度", task.getId()); + return; + } + + // 启动定时任务 + try { + boolean success = taskSchedulerManager.startWorkOrderTask( + task.getId(), + task.getCronExpression() + ); + + if (success) { + log.info("任务{}调度启动成功", task.getId()); + } else { + log.error("任务{}调度启动失败", task.getId()); + // 可以记录失败日志或发送通知 + } + } catch (Exception e) { + log.error("任务{}调度启动异常", task.getId(), e); + // 可以考虑回滚数据库状态 + } + } + + /** + * 禁用任务调度 + */ + private void disableTaskSchedule(TaskManagementDO task) { + try { + boolean success = taskSchedulerManager.stopWorkOrderTask(task.getId()); + + if (success) { + log.info("任务{}调度停止成功", task.getId()); + } else { + log.warn("任务{}调度停止失败,可能任务不存在或已停止", task.getId()); + } + } catch (Exception e) { + log.error("任务{}调度停止异常", task.getId(), e); + } + } + /** + * 更新数据库状态 + */ + private void updateTaskStatusInDB(Long taskId, Boolean enabled) { + TaskManagementDO updateObj = new TaskManagementDO(); + updateObj.setId(taskId); + updateObj.setEnabled(enabled); + taskManagementMapper.updateById(updateObj); + } + /** * 带前缀的时间戳单号