Merge branch 'main' into plp

plp
86158 3 weeks ago
commit dbc3bd7db5

@ -0,0 +1,60 @@
// TaskTypeEnum.java
package cn.iocoder.yudao.module.iot.controller.admin.device.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public enum TaskTypeEnum {
DEVICE("DEVICE", "设备数据采集"),
WORK_ORDER("WORK_ORDER", "工单生成");
/**
*
*/
private final String code;
/**
*
*/
private final String name;
/**
*
*/
public static TaskTypeEnum getByCode(String code) {
for (TaskTypeEnum type : values()) {
if (type.getCode().equals(code)) {
return type;
}
}
return null;
}
/**
*
*/
public static boolean contains(String code) {
return getByCode(code) != null;
}
/**
* ID
*/
public Long generateTaskId(Long baseId) {
if (baseId == null) {
baseId = 0L;
}
switch (this) {
case DEVICE:
return 1000000L + baseId;
case WORK_ORDER:
return 2000000L + baseId;
default:
return 9000000L + baseId;
}
}
}

@ -13,11 +13,16 @@ public class SchedulerConfig {
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(20); // 设置线程池大小
scheduler.setThreadNamePrefix("scheduled-task-"); // 线程名前缀
scheduler.setAwaitTerminationSeconds(60); // 等待任务完成的秒数
scheduler.setWaitForTasksToCompleteOnShutdown(true); // 关闭时等待任务完成
scheduler.initialize(); // 初始化
scheduler.setPoolSize(50); // 增加线程数
scheduler.setThreadNamePrefix("scheduled-task-");
scheduler.setAwaitTerminationSeconds(60);
scheduler.setWaitForTasksToCompleteOnShutdown(true);
// 设置队列容量
scheduler.setPoolSize(50);
scheduler.setThreadPriority(Thread.NORM_PRIORITY);
scheduler.setDaemon(false);
scheduler.initialize();
return scheduler;
}
}

@ -0,0 +1,14 @@
package cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.core;
public interface Task {
/**
*
*/
void execute(Long taskId, String taskParam);
/**
*
*/
String getTaskType();
}

@ -1,39 +1,196 @@
// DeviceTask.java - 原有设备任务
package cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.coretask;
import cn.iocoder.yudao.framework.common.util.opc.OpcUtils;
import cn.iocoder.yudao.module.iot.controller.admin.device.enums.TaskTypeEnum;
import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.core.Task;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceDO;
import cn.iocoder.yudao.module.iot.dal.devicecontactmodel.DeviceContactModelDO;
import cn.iocoder.yudao.module.iot.dal.mysql.device.DeviceMapper;
import cn.iocoder.yudao.module.iot.dal.mysql.devicecontactmodel.DeviceContactModelMapper;
import cn.iocoder.yudao.module.iot.service.device.TDengineService;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@Component
public class DeviceTask {
@Slf4j
public class DeviceTask implements Task {
private static final Logger logger = LoggerFactory.getLogger(DeviceTask.class);
/**
*
* @param deviceId ID
* @param deviceCode
*/
public void executeDeviceLogic(Long deviceId, String deviceCode) {
@Resource
private DeviceContactModelMapper deviceContactModelMapper;
@Resource
private DeviceMapper deviceMapper;
@Resource
private TDengineService tDengineService;
@Override
public String getTaskType() {
return TaskTypeEnum.DEVICE.getCode();
}
@Override
public void execute(Long taskId, String taskParam) {
try {
// 创建时间格式化对象
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String currentTime = sdf.format(new Date());
logger.info("执行设备任务设备ID: {}, 设备编码: {}, 时间: {}",
deviceId, deviceCode, currentTime);
logger.info("执行设备任务任务ID: {}, 参数: {}, 时间: {}",
taskId, taskParam, currentTime);
// 解析参数,假设格式为 deviceId:deviceCode
// String[] params = taskParam.split(":");
// if (params.length >= 2) {
// Long deviceId = Long.parseLong(params[0]);
// String deviceCode = params[1];
// executeDeviceLogic(deviceId, deviceCode);
// }
executeDeviceLogic(taskId,taskParam);
// TODO: 这里编写具体的设备执行逻辑
// 比如:读取设备数据、发送指令、处理响应等
} catch (Exception e) {
// 异常信息中也加入时间戳
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String errorTime = sdf.format(new Date());
logger.error("执行设备任务异常任务ID: {}", taskId, e);
}
}
/**
*
*/
private void executeDeviceLogic(Long sourceDeviceId, String param) {
logger.info("执行设备 {} 的具体逻辑,参数: {}", sourceDeviceId, param);
Long deviceId = sourceDeviceId - 1000000L;
logger.info("处理后id{} ", deviceId );
// 1. 参数校验
if (deviceId == null){
logger.error("设备ID不能为空");
throw new RuntimeException("设备ID不能为空");
}
// 2. 查询设备信息
DeviceDO deviceDO = deviceMapper.selectById(deviceId);
if (deviceDO == null) {
logger.error("设备不存在设备ID: {}", deviceId);
throw new RuntimeException("设备不存在设备ID: " + deviceId);
}
if (deviceDO.getUrl() == null || deviceDO.getUrl().trim().isEmpty()) {
logger.error("设备URL不能为空设备ID: {}", deviceId);
throw new RuntimeException("设备URL不能为空");
}
// 3. 连接OPC服务器
String username = deviceDO.getUsername() != null ? deviceDO.getUsername() : "";
String password = deviceDO.getPassword() != null ? deviceDO.getPassword() : "";
logger.error("执行设备任务异常设备ID: {}, 异常时间: {}",
deviceId, errorTime, e);
boolean connected = OpcUtils.connect(deviceDO.getUrl(), username, password, 10);
if (!connected) {
logger.error("连接OPC服务器失败设备ID: {}URL: {}", deviceId, deviceDO.getUrl());
throw new RuntimeException("连接OPC服务器失败");
}
// 4. 查询设备点位配置
LambdaQueryWrapper<DeviceContactModelDO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(DeviceContactModelDO::getDeviceId, deviceId);
List<DeviceContactModelDO> deviceContactModelDOS = deviceContactModelMapper.selectList(queryWrapper);
// 5. 判断是否有点位数据
if (deviceContactModelDOS == null || deviceContactModelDOS.isEmpty()) {
logger.warn("设备 {} 没有配置数据点位,跳过数据读取", deviceId);
return;
}
logger.info("设备 {} 共有 {} 个点位需要读取", deviceId, deviceContactModelDOS.size());
// 6. 读取OPC数据
int successCount = 0;
List<DeviceContactModelDO> validDataList = new ArrayList<>();
for (DeviceContactModelDO deviceContactModelDO : deviceContactModelDOS) {
try {
// 判断点位地址是否有效
String address = deviceContactModelDO.getAddress();
if (address == null || address.trim().isEmpty()) {
logger.warn("点位ID {} 的地址为空,跳过", deviceContactModelDO.getId());
continue;
}
// 读取OPC值
Object addressValue = OpcUtils.readValue(address);
if (addressValue == null) {
logger.warn("读取点位 {} 的值返回null地址: {}",
deviceContactModelDO.getId(), address);
} else {
// 值验证
if (addressValue instanceof String) {
String strValue = (String) addressValue;
if (strValue.trim().isEmpty()) {
logger.warn("读取点位 {} 的值为空字符串", deviceContactModelDO.getId());
deviceContactModelDO.setAddressValue("");
} else {
deviceContactModelDO.setAddressValue(addressValue);
successCount++;
}
} else {
deviceContactModelDO.setAddressValue(addressValue);
successCount++;
}
}
validDataList.add(deviceContactModelDO);
} catch (Exception e) {
logger.error("读取点位 {} 异常,地址: {}",
deviceContactModelDO.getId(),
deviceContactModelDO.getAddress(), e);
}
}
// 7. 判断是否有有效数据
if (validDataList.isEmpty()) {
logger.warn("设备 {} 没有读取到任何有效数据,跳过入库", deviceId);
return;
}
logger.info("设备 {} 成功读取 {} 个点位数据,总计 {} 个点位",
deviceId, successCount, validDataList.size());
try {
// 8. 数据入库
String json = JSON.toJSONString(validDataList);
boolean insertSuccess = tDengineService.insertDeviceData(deviceId, json);
if (insertSuccess) {
logger.info("设备 {} 数据成功插入TDengine数据量: {}", deviceId, validDataList.size());
} else {
logger.error("设备 {} 数据插入TDengine失败", deviceId);
}
} catch (Exception e) {
logger.error("设备 {} 数据入库异常", deviceId, e);
} finally {
// 9. 确保断开连接
try {
OpcUtils.disconnect();
} catch (Exception e) {
logger.error("断开OPC连接异常设备ID: {}", deviceId, e);
}
}
}
}

@ -1,142 +1,283 @@
// TaskSchedulerManager.java
package cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.scheduler;
import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.coretask.DeviceTask;
import cn.iocoder.yudao.module.iot.controller.admin.device.enums.TaskTypeEnum;
import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.core.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.scheduling.support.PeriodicTrigger;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@Component
public class TaskSchedulerManager {
private static final Logger logger = LoggerFactory.getLogger(TaskSchedulerManager.class);
private TaskScheduler taskScheduler; // 移除 static
private DeviceTask deviceTask; // 移除 static
private final Map<Long, ScheduledFuture<?>> taskMap = new ConcurrentHashMap<>();
private final TaskScheduler taskScheduler;
// 存储所有任务
private final Map<String, Task> taskBeans = new ConcurrentHashMap<>();
// 存储正在运行的任务
private final Map<Long, ScheduledFuture<?>> runningTasks = new ConcurrentHashMap<>();
private final Map<Long, TaskInfo> taskInfos = new ConcurrentHashMap<>();
// 任务信息类
private static class TaskInfo {
private Long taskId;
private String taskType;
private String taskParam;
private String cronExpression;
private String scheduleType; // "CRON" 或 "FIXED_RATE" 或 "FIXED_DELAY"
private Long initialDelay;
private Long period;
private TimeUnit timeUnit;
// 构造器
public TaskInfo(Long taskId, String taskType, String taskParam) {
this.taskId = taskId;
this.taskType = taskType;
this.taskParam = taskParam;
}
// getters and setters...
}
// 通过构造器注入
public TaskSchedulerManager(TaskScheduler taskScheduler,
@Autowired(required = false) java.util.List<Task> tasks) {
this.taskScheduler = taskScheduler;
// 注册所有任务
if (tasks != null) {
for (Task task : tasks) {
taskBeans.put(task.getTaskType(), task);
logger.info("注册任务类型: {}", task.getTaskType());
}
}
}
/**
* TaskScheduler
* Spring TaskScheduler Bean
* Cron
*/
@Autowired
public TaskSchedulerManager(TaskScheduler taskScheduler) {
this.taskScheduler = taskScheduler;
public boolean startCronTask(Long taskId, String taskType, String taskParam, String cronExpression) {
return startTask(taskId, taskType, taskParam, "CRON", cronExpression, null, null, null);
}
/**
* DeviceTask
*
*/
@Autowired
public void setDeviceTask(DeviceTask deviceTask) {
this.deviceTask = deviceTask;
public boolean startFixedRateTask(Long taskId, String taskType, String taskParam,
long initialDelay, long period, TimeUnit timeUnit) {
return startTask(taskId, taskType, taskParam, "FIXED_RATE", null,
initialDelay, period, timeUnit);
}
/**
* Spring TaskScheduler Bean
*
*
*/
@PostConstruct
public void init() {
if (taskScheduler == null) {
logger.warn("TaskScheduler not found in context, creating default one");
ThreadPoolTaskScheduler defaultScheduler = new ThreadPoolTaskScheduler();
defaultScheduler.setPoolSize(10);
defaultScheduler.setThreadNamePrefix("device-task-");
defaultScheduler.initialize();
this.taskScheduler = defaultScheduler;
}
public boolean startFixedDelayTask(Long taskId, String taskType, String taskParam,
long initialDelay, long period, TimeUnit timeUnit) {
return startTask(taskId, taskType, taskParam, "FIXED_DELAY", null,
initialDelay, period, timeUnit);
}
/**
*
* static
*
*/
public boolean startDeviceTask(Long deviceId, String deviceCode, String cronExpression) {
private boolean startTask(Long taskId, String taskType, String taskParam,
String scheduleType, String cronExpression,
Long initialDelay, Long period, TimeUnit timeUnit) {
try {
// 先停止已存在的任务
stopDeviceTask(deviceId);
// 停止已存在的任务
stopTask(taskId);
// 验证 DeviceTask
if (deviceTask == null) {
logger.error("DeviceTask is not initialized");
// 获取任务实例
Task task = taskBeans.get(taskType);
if (task == null) {
logger.error("任务类型不存在: {}", taskType);
return false;
}
// 创建新的定时任务
ScheduledFuture<?> future = taskScheduler.schedule(
() -> {
try {
deviceTask.executeDeviceLogic(deviceId, deviceCode);
} catch (Exception e) {
logger.error("Device task execution failed for deviceId: {}", deviceId, e);
}
},
new CronTrigger(cronExpression)
);
taskMap.put(deviceId, future);
logger.info("启动设备定时任务成功设备ID: {}, cron表达式: {}", deviceId, cronExpression);
return true;
// 创建任务信息
TaskInfo taskInfo = new TaskInfo(taskId, taskType, taskParam);
taskInfo.scheduleType = scheduleType;
taskInfo.cronExpression = cronExpression;
taskInfo.initialDelay = initialDelay;
taskInfo.period = period;
taskInfo.timeUnit = timeUnit;
taskInfos.put(taskId, taskInfo);
// 根据调度类型创建任务
ScheduledFuture<?> future = null;
switch (scheduleType) {
case "CRON":
future = taskScheduler.schedule(
() -> task.execute(taskId, taskParam),
new CronTrigger(cronExpression)
);
break;
case "FIXED_RATE":
if (initialDelay != null && period != null && timeUnit != null) {
future = taskScheduler.scheduleAtFixedRate(
() -> task.execute(taskId, taskParam),
initialDelay
);
}
break;
case "FIXED_DELAY":
if (initialDelay != null && period != null && timeUnit != null) {
future = taskScheduler.scheduleWithFixedDelay(
() -> task.execute(taskId, taskParam),
initialDelay
);
}
break;
}
if (future != null) {
runningTasks.put(taskId, future);
logger.info("启动{}任务成功任务ID: {}, 类型: {}, 参数: {}",
scheduleType, taskId, taskType, taskParam);
return true;
}
return false;
} catch (Exception e) {
logger.error("启动设备定时任务失败设备ID: {}", deviceId, e);
logger.error("启动任务失败任务ID: {}", taskId, e);
return false;
}
}
/**
*
* static
*
*/
public boolean stopDeviceTask(Long deviceId) {
public boolean stopTask(Long taskId) {
try {
ScheduledFuture<?> future = taskMap.get(deviceId);
ScheduledFuture<?> future = runningTasks.get(taskId);
if (future != null) {
future.cancel(true);
taskMap.remove(deviceId);
logger.info("停止设备定时任务成功设备ID: {}", deviceId);
runningTasks.remove(taskId);
taskInfos.remove(taskId);
logger.info("停止任务成功任务ID: {}", taskId);
return true;
} else {
logger.info("设备定时任务不存在设备ID: {}", deviceId);
return false;
}
return false;
} catch (Exception e) {
logger.error("停止设备定时任务失败设备ID: {}", deviceId, e);
logger.error("停止任务失败任务ID: {}", taskId, e);
return false;
}
}
/**
*
*
*/
public TaskInfo getTaskInfo(Long taskId) {
return taskInfos.get(taskId);
}
/**
*
*/
public boolean isTaskRunning(Long deviceId) {
ScheduledFuture<?> future = taskMap.get(deviceId);
public boolean isTaskRunning(Long taskId) {
ScheduledFuture<?> future = runningTasks.get(taskId);
return future != null && !future.isCancelled() && !future.isDone();
}
/**
*
*
*/
public int getTaskCount() {
return taskMap.size();
public Map<String, Task> getAllTaskTypes() {
return new ConcurrentHashMap<>(taskBeans);
}
/**
*
*/
public int getRunningTaskCount() {
return runningTasks.size();
}
/**
*
*/
public void stopAllTasks() {
logger.info("停止所有定时任务,共 {} 个", taskMap.size());
for (Long deviceId : taskMap.keySet()) {
stopDeviceTask(deviceId);
logger.info("停止所有定时任务,共 {} 个", runningTasks.size());
for (Long taskId : runningTasks.keySet()) {
stopTask(taskId);
}
}
// /**
// * 原设备任务方法(保持兼容性)
// */
// public boolean startDeviceTask(Long deviceId, String deviceCode, String cronExpression) {
// String taskParam = deviceId + ":" + deviceCode;
// return startCronTask(deviceId, TaskTypeEnum.DEVICE.getCode(), taskParam, cronExpression);
// }
/**
*
*/
public boolean startDeviceTask(Long deviceId, String cronExpression) {
Long taskId = TaskTypeEnum.DEVICE.generateTaskId(deviceId);
return startCronTask(taskId, TaskTypeEnum.DEVICE.getCode(),
"device:" + deviceId, cronExpression);
}
/**
*
*/
public boolean stopDeviceTask(Long deviceId) {
if (deviceId == null) {
logger.warn("设备ID不能为空");
return false;
}
// 生成与启动时相同的任务ID
Long taskId = TaskTypeEnum.DEVICE.generateTaskId(deviceId);
logger.info("停止设备任务设备ID: {}, 生成的任务ID: {}", deviceId, taskId);
return stopTask(taskId);
}
/**
*
*/
public boolean startWorkOrderTask(Long configId, String cronExpression) {
Long taskId = TaskTypeEnum.WORK_ORDER.generateTaskId(configId);
return startCronTask(taskId, TaskTypeEnum.WORK_ORDER.getCode(),
"workOrder:" + configId, cronExpression);
}
/**
*
*/
public boolean stopWorkOrderTask(Long configId) {
if (configId == null) {
logger.warn("工单配置ID不能为空");
return false;
}
// 生成与启动时相同的任务ID
Long taskId = TaskTypeEnum.WORK_ORDER.generateTaskId(configId);
logger.info("停止工单任务配置ID: {}, 生成的任务ID: {}", configId, taskId);
return stopTask(taskId);
}
}

@ -406,23 +406,12 @@ public class DeviceServiceImpl implements DeviceService {
deviceDO.setStatus(String.valueOf(DeviceConnectionStatusEnum.CONNECTED.getStatus()));
deviceMapper.updateById(deviceDO);
String cronExpression = CronExpressionUtils.secondsToCron(deviceDO.getSampleCycle());
//查询存储
LambdaQueryWrapper<DeviceContactModelDO> deviceModelAttributeLambdaQueryWrapper = new LambdaQueryWrapper<>();
deviceModelAttributeLambdaQueryWrapper.eq(DeviceContactModelDO::getDeviceId,createReqVO.getId());
List<DeviceContactModelDO> deviceContactModelDOS = deviceContactModelMapper.selectList(deviceModelAttributeLambdaQueryWrapper);
if (deviceContactModelDOS != null && deviceContactModelDOS.size() > 0){
for (DeviceContactModelDO deviceContactModelDO : deviceContactModelDOS) {
Object addressValue = OpcUtils.readValue(deviceContactModelDO.getAddress() != null ? deviceContactModelDO.getAddress() : "");
deviceContactModelDO.setAddressValue(addressValue);
}
String json = JSON.toJSONString(deviceContactModelDOS);
tdengineService.insertDeviceData(createReqVO.getId(),json);
}
taskSchedulerManager.startDeviceTask(
deviceDO.getId(),
cronExpression
);
}else {
@ -433,6 +422,7 @@ public class DeviceServiceImpl implements DeviceService {
if (disconnect){
deviceDO.setStatus(String.valueOf(DeviceConnectionStatusEnum.DISCONNECTED.getStatus()));
deviceMapper.updateById(deviceDO);
taskSchedulerManager.stopDeviceTask(deviceDO.getId());
}else {
throw exception(OPC_CLOSE_CONNECT_FAILURE);
}
@ -440,15 +430,6 @@ public class DeviceServiceImpl implements DeviceService {
throw exception(OPC_PARAMETER_DOES_NOT_EXIST);
}
return Boolean.TRUE;
}
@ -704,7 +685,6 @@ public class DeviceServiceImpl implements DeviceService {
// 2. 启动定时任务
boolean success = taskSchedulerManager.startDeviceTask(
deviceDO.getId(),
deviceDO.getDeviceCode(),
cronExpression
);

@ -121,15 +121,23 @@ public class TaskManagementController {
return success(true);
}
@PutMapping("/update-enabled")
@Operation(summary = "更新任务管理启用状态")
@PreAuthorize("@ss.hasPermission('mes:task-management:update')")
public CommonResult<Boolean> updateTaskManagementEnabled(@Valid @RequestBody TaskManagementUpdateEnabledReqVO updateEnabledReqVO) {
taskManagementService.updateTaskManagementEnabled(updateEnabledReqVO);
return success(true);
}
private PageResult<TaskManagementRespVO> buildPageCreatorName(PageResult<TaskManagementRespVO> planMaintenanceRespVOPageResult) {
for (TaskManagementRespVO planMaintenanceRespVO : planMaintenanceRespVOPageResult.getList()) {
AdminUserRespDTO user = adminUserApi.getUser(Long.valueOf(planMaintenanceRespVO.getCreator()));
planMaintenanceRespVO.setCreatorName( "(" + user.getUsername()+ ")" + user.getNickname());
if (user!=null){
planMaintenanceRespVO.setCreatorName( "(" + user.getUsername()+ ")" + user.getNickname());
}
}
return planMaintenanceRespVOPageResult;
}

@ -0,0 +1,143 @@
// DataSyncTask.java - 数据同步任务
package cn.iocoder.yudao.module.mes.controller.admin.taskmanagement.scheduled.coretask;
import cn.iocoder.yudao.module.iot.controller.admin.device.enums.TaskTypeEnum;
import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.core.Task;
import cn.iocoder.yudao.module.iot.service.device.TDengineService;
import cn.iocoder.yudao.module.mes.dal.dataobject.deviceledger.DeviceLedgerDO;
import cn.iocoder.yudao.module.mes.dal.dataobject.dvsubject.DvSubjectDO;
import cn.iocoder.yudao.module.mes.dal.dataobject.subjectplan.SubjectPlanDO;
import cn.iocoder.yudao.module.mes.dal.dataobject.taskmanagement.TaskManagementDO;
import cn.iocoder.yudao.module.mes.dal.dataobject.ticketmanagement.TicketManagementDO;
import cn.iocoder.yudao.module.mes.dal.dataobject.ticketresults.TicketResultsDO;
import cn.iocoder.yudao.module.mes.dal.mysql.deviceledger.DeviceLedgerMapper;
import cn.iocoder.yudao.module.mes.dal.mysql.dvsubject.DvSubjectMapper;
import cn.iocoder.yudao.module.mes.dal.mysql.subjectplan.SubjectPlanMapper;
import cn.iocoder.yudao.module.mes.dal.mysql.taskmanagement.TaskManagementMapper;
import cn.iocoder.yudao.module.mes.dal.mysql.ticketmanagement.TicketManagementMapper;
import cn.iocoder.yudao.module.mes.dal.mysql.ticketresults.TicketResultsMapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
@Component
public class GenerateWorkOrderTask implements Task {
private static final Logger logger = LoggerFactory.getLogger(GenerateWorkOrderTask.class);
@Resource
private TaskManagementMapper taskManagementMapper;
@Resource
private DeviceLedgerMapper deviceLedgerMapper;
@Resource
private TicketManagementMapper ticketManagementMapper;
@Resource
private SubjectPlanMapper subjectPlanMapper;
@Resource
private DvSubjectMapper dvSubjectMapper;
@Resource
private TicketResultsMapper ticketResultsMapper;
@Override
public String getTaskType() {
return TaskTypeEnum.WORK_ORDER.getCode();
}
@Override
public void execute(Long taskId, String taskParam) {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String currentTime = sdf.format(new Date());
logger.info("执行数据同步任务任务ID: {}, 参数: {}, 时间: {}",
taskId, taskParam, currentTime);
// 解析同步参数
// 格式示例: "source:target" 或 "table:condition"
generateWorkOrder(taskId);
} catch (Exception e) {
logger.error("执行数据同步任务异常任务ID: {}", taskId, e);
}
}
private void generateWorkOrder(Long taskId) {
logger.info("开始同步数据id: {}", taskId);
Long id = taskId -2000000L;
logger.info("处理后id{} ", id );
//检验数据是否存在
taskManagementMapper.selectById(id);
TaskManagementDO taskManagementDO = taskManagementMapper.selectById(id);
if (taskManagementDO == null){
return;
}
// 将逗号分隔的字符串转换为Long类型的List
List<Long> idList = Arrays.stream(taskManagementDO.getDeviceList().split(","))
.map(String::trim) // 去除可能存在的空格
.map(Long::valueOf)
.collect(Collectors.toList());
for (Long deviceId : idList) {
TicketManagementDO ticketManagementDO = new TicketManagementDO();
DeviceLedgerDO deviceLedgerDO = deviceLedgerMapper.selectById(deviceId);
ticketManagementDO.setTaskId(taskManagementDO.getId());
ticketManagementDO.setPlanNo(generatePrefixedOrderNo());
ticketManagementDO.setPlanId(taskManagementDO.getProjectForm());
ticketManagementDO.setDeviceName(deviceLedgerDO.getDeviceName());
ticketManagementDO.setPlanType(taskManagementDO.getTaskType());
ticketManagementDO.setConfigName(taskManagementDO.getName());
ticketManagementDO.setTaskEndTime(taskManagementDO.getEndDate().atStartOfDay());
// TODO 默认为内置管理员Id
ticketManagementDO.setCreator("1");
ticketManagementDO.setUpdater("1");
ticketManagementMapper.insert(ticketManagementDO);
List<SubjectPlanDO> subjectPlanDOList = subjectPlanMapper.selectList(Wrappers.<SubjectPlanDO>lambdaQuery().eq(SubjectPlanDO::getPlanId, ticketManagementDO.getPlanId()));
for (SubjectPlanDO subjectPlanDO : subjectPlanDOList) {
DvSubjectDO dvSubjectDO = dvSubjectMapper.selectById(subjectPlanDO.getSubjectId());
TicketResultsDO ticketResultsDO = new TicketResultsDO();
ticketResultsDO.setInspectionItemName(dvSubjectDO.getSubjectName());
ticketResultsDO.setInspectionMethod(dvSubjectDO.getInspectionMethod());
ticketResultsDO.setJudgmentCriteria(dvSubjectDO.getJudgmentCriteria());
ticketResultsDO.setManagementId(ticketManagementDO.getId());
ticketResultsDO.setDeviceId(deviceId);
// TODO 默认为内置管理员Id
ticketResultsDO.setCreator("1");
ticketResultsDO.setUpdater("1");
ticketResultsMapper.insert(ticketResultsDO);
}
}
}
/**
*
*/
public static String generatePrefixedOrderNo() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
String date = sdf.format(new Date());
String randomNum = String.format("%06d", new Random().nextInt(1000000));
return "E" + date + randomNum;
}
}

@ -0,0 +1,19 @@
package cn.iocoder.yudao.module.mes.controller.admin.taskmanagement.vo;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import javax.validation.constraints.NotNull;
@Schema(description = "任务管理 - 更新启用状态 Request VO")
@Data
public class TaskManagementUpdateEnabledReqVO {
@Schema(description = "任务ID", required = true, example = "1024")
@NotNull(message = "任务ID不能为空")
private Long id;
@Schema(description = "是否启用", required = true, example = "true")
@NotNull(message = "启用状态不能为空")
private Boolean enabled;
}

@ -115,8 +115,10 @@ public class TicketManagementController {
private PageResult<TicketManagementRespVO> buildPageCreatorName(PageResult<TicketManagementRespVO> ticketManagementRespVOPageResult) {
for (TicketManagementRespVO ticketManagementRespVO : ticketManagementRespVOPageResult.getList()) {
AdminUserRespDTO user = adminUserApi.getUser(Long.valueOf(ticketManagementRespVO.getCreator()));
ticketManagementRespVO.setCreatorName( "(" + user.getUsername()+ ")" + user.getNickname());
if (ticketManagementRespVO.getCreator()!=null){
AdminUserRespDTO user = adminUserApi.getUser(Long.valueOf(ticketManagementRespVO.getCreator()));
ticketManagementRespVO.setCreatorName( "(" + user.getUsername()+ ")" + user.getNickname());
}
if (ticketManagementRespVO.getOperator()!=null){
AdminUserRespDTO operator = adminUserApi.getUser(Long.valueOf(ticketManagementRespVO.getOperator()));
ticketManagementRespVO.setOperatorName("(" + operator.getUsername()+ ")" + operator.getNickname());

@ -33,7 +33,7 @@ public interface TicketManagementMapper extends BaseMapperX<TicketManagementDO>
.likeIfPresent(TicketManagementDO::getConfigName, reqVO.getConfigName())
.eqIfPresent(TicketManagementDO::getJobStatus, reqVO.getJobStatus())
.eqIfPresent(TicketManagementDO::getJobResult, reqVO.getJobResult())
.orderByDesc(TicketManagementDO::getCreateTime);
.orderByDesc(TicketManagementDO::getId);
// 单独处理 ids 条件

@ -53,4 +53,6 @@ public interface TaskManagementService {
PageResult<TaskManagementDO> getTaskManagementPage(TaskManagementPageReqVO pageReqVO);
void createTicket(Long id);
void updateTaskManagementEnabled(TaskManagementUpdateEnabledReqVO updateEnabledReqVO);
}

@ -1,5 +1,7 @@
package cn.iocoder.yudao.module.mes.service.taskmanagement;
import cn.iocoder.yudao.module.iot.controller.admin.device.enums.TaskTypeEnum;
import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.scheduler.TaskSchedulerManager;
import cn.iocoder.yudao.module.mes.dal.dataobject.deviceledger.DeviceLedgerDO;
import cn.iocoder.yudao.module.mes.dal.dataobject.dvsubject.DvSubjectDO;
import cn.iocoder.yudao.module.mes.dal.dataobject.planmaintenance.PlanMaintenanceDO;
@ -12,7 +14,9 @@ import cn.iocoder.yudao.module.mes.dal.mysql.planmaintenance.PlanMaintenanceMapp
import cn.iocoder.yudao.module.mes.dal.mysql.subjectplan.SubjectPlanMapper;
import cn.iocoder.yudao.module.mes.dal.mysql.ticketmanagement.TicketManagementMapper;
import cn.iocoder.yudao.module.mes.dal.mysql.ticketresults.TicketResultsMapper;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import org.springframework.validation.annotation.Validated;
@ -40,6 +44,7 @@ import static cn.iocoder.yudao.module.mes.enums.ErrorCodeConstants.*;
*/
@Service
@Validated
@Slf4j
public class TaskManagementServiceImpl implements TaskManagementService {
@Resource
@ -60,10 +65,12 @@ public class TaskManagementServiceImpl implements TaskManagementService {
@Resource
private SubjectPlanMapper subjectPlanMapper;
@Resource
private DvSubjectMapper dvSubjectMapper;
@Resource
private TaskSchedulerManager taskSchedulerManager;
@Override
public Long createTaskManagement(TaskManagementSaveReqVO createReqVO) {
// 插入
@ -121,8 +128,6 @@ public class TaskManagementServiceImpl implements TaskManagementService {
@Override
public void createTicket(Long id) {
List<TicketManagementDO> ticketManagementDOS = new ArrayList<>();
//检验数据是否存在
validateTaskManagementExists(id);
TaskManagementDO taskManagementDO = taskManagementMapper.selectById(id);
@ -136,7 +141,6 @@ public class TaskManagementServiceImpl implements TaskManagementService {
.map(Long::valueOf)
.collect(Collectors.toList());
for (Long deviceId : idList) {
TicketManagementDO ticketManagementDO = new TicketManagementDO();
DeviceLedgerDO deviceLedgerDO = deviceLedgerMapper.selectById(deviceId);
@ -149,7 +153,6 @@ public class TaskManagementServiceImpl implements TaskManagementService {
ticketManagementDO.setTaskEndTime(taskManagementDO.getEndDate().atStartOfDay());
ticketManagementMapper.insert(ticketManagementDO);
List<DvSubjectDO> dvSubjectDOList = new ArrayList<>();
List<SubjectPlanDO> subjectPlanDOList = subjectPlanMapper.selectList(Wrappers.<SubjectPlanDO>lambdaQuery().eq(SubjectPlanDO::getPlanId, ticketManagementDO.getPlanId()));
for (SubjectPlanDO subjectPlanDO : subjectPlanDOList) {
DvSubjectDO dvSubjectDO = dvSubjectMapper.selectById(subjectPlanDO.getSubjectId());
@ -167,6 +170,95 @@ public class TaskManagementServiceImpl implements TaskManagementService {
}
@Override
public void updateTaskManagementEnabled(TaskManagementUpdateEnabledReqVO updateEnabledReqVO) {
// 1. 校验任务是否存在
TaskManagementDO task = taskManagementMapper.selectById(updateEnabledReqVO.getId());
if (task == null) {
throw exception(TASK_MANAGEMENT_NOT_EXISTS);
}
// 2. 如果状态没有变化,直接返回
if (Objects.equals(task.getEnabled(), updateEnabledReqVO.getEnabled())) {
return;
}
// 3. 执行状态更新操作
executeEnableUpdate(task, updateEnabledReqVO.getEnabled());
}
/**
* /
*/
private void executeEnableUpdate(TaskManagementDO task, Boolean enabled) {
// 更新数据库状态
updateTaskStatusInDB(task.getId(), enabled);
// 处理定时任务
if (enabled) {
enableTaskSchedule(task);
} else {
disableTaskSchedule(task);
}
}
/**
*
*/
private void enableTaskSchedule(TaskManagementDO task) {
// 验证Cron表达式
if (StringUtils.isBlank(task.getCronExpression())) {
log.warn("任务{}没有配置Cron表达式无法启用调度", task.getId());
return;
}
// 启动定时任务
try {
boolean success = taskSchedulerManager.startWorkOrderTask(
task.getId(),
task.getCronExpression()
);
if (success) {
log.info("任务{}调度启动成功", task.getId());
} else {
log.error("任务{}调度启动失败", task.getId());
// 可以记录失败日志或发送通知
}
} catch (Exception e) {
log.error("任务{}调度启动异常", task.getId(), e);
// 可以考虑回滚数据库状态
}
}
/**
*
*/
private void disableTaskSchedule(TaskManagementDO task) {
try {
boolean success = taskSchedulerManager.stopWorkOrderTask(task.getId());
if (success) {
log.info("任务{}调度停止成功", task.getId());
} else {
log.warn("任务{}调度停止失败,可能任务不存在或已停止", task.getId());
}
} catch (Exception e) {
log.error("任务{}调度停止异常", task.getId(), e);
}
}
/**
*
*/
private void updateTaskStatusInDB(Long taskId, Boolean enabled) {
TaskManagementDO updateObj = new TaskManagementDO();
updateObj.setId(taskId);
updateObj.setEnabled(enabled);
taskManagementMapper.updateById(updateObj);
}
/**
*

Loading…
Cancel
Save