feat:完成采集设备定时任务

plp
HuangHuiKang 3 weeks ago
parent 4a2dd23e5c
commit 7777ff1929

@ -0,0 +1,14 @@
package cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.core;
public interface Task {
/**
*
*/
void execute(Long taskId, String taskParam);
/**
*
*/
String getTaskType();
}

@ -1,39 +1,193 @@
// DeviceTask.java - 原有设备任务
package cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.coretask; package cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.coretask;
import cn.iocoder.yudao.framework.common.util.opc.OpcUtils;
import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.core.Task;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.DeviceDO;
import cn.iocoder.yudao.module.iot.dal.devicecontactmodel.DeviceContactModelDO;
import cn.iocoder.yudao.module.iot.dal.mysql.device.DeviceMapper;
import cn.iocoder.yudao.module.iot.dal.mysql.devicecontactmodel.DeviceContactModelMapper;
import cn.iocoder.yudao.module.iot.service.device.TDengineService;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List;
@Component @Component
public class DeviceTask { @Slf4j
public class DeviceTask implements Task {
private static final Logger logger = LoggerFactory.getLogger(DeviceTask.class); private static final Logger logger = LoggerFactory.getLogger(DeviceTask.class);
/** @Resource
* private DeviceContactModelMapper deviceContactModelMapper;
* @param deviceId ID
* @param deviceCode @Resource
*/ private DeviceMapper deviceMapper;
public void executeDeviceLogic(Long deviceId, String deviceCode) {
@Resource
private TDengineService tDengineService;
@Override
public String getTaskType() {
return "DEVICE";
}
@Override
public void execute(Long taskId, String taskParam) {
try { try {
// 创建时间格式化对象
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String currentTime = sdf.format(new Date()); String currentTime = sdf.format(new Date());
logger.info("执行设备任务设备ID: {}, 设备编码: {}, 时间: {}", logger.info("执行设备任务任务ID: {}, 参数: {}, 时间: {}",
deviceId, deviceCode, currentTime); taskId, taskParam, currentTime);
// 解析参数,假设格式为 deviceId:deviceCode
// String[] params = taskParam.split(":");
// if (params.length >= 2) {
// Long deviceId = Long.parseLong(params[0]);
// String deviceCode = params[1];
// executeDeviceLogic(deviceId, deviceCode);
// }
executeDeviceLogic(taskId,taskParam);
// TODO: 这里编写具体的设备执行逻辑
// 比如:读取设备数据、发送指令、处理响应等
} catch (Exception e) { } catch (Exception e) {
// 异常信息中也加入时间戳 logger.error("执行设备任务异常任务ID: {}", taskId, e);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); }
String errorTime = sdf.format(new Date()); }
/**
*
*/
private void executeDeviceLogic(Long deviceId, String param) {
logger.info("执行设备 {} 的具体逻辑,参数: {}", deviceId, param);
// 1. 参数校验
if (deviceId == null){
logger.error("设备ID不能为空");
throw new RuntimeException("设备ID不能为空");
}
// 2. 查询设备信息
DeviceDO deviceDO = deviceMapper.selectById(deviceId);
if (deviceDO == null) {
logger.error("设备不存在设备ID: {}", deviceId);
throw new RuntimeException("设备不存在设备ID: " + deviceId);
}
if (deviceDO.getUrl() == null || deviceDO.getUrl().trim().isEmpty()) {
logger.error("设备URL不能为空设备ID: {}", deviceId);
throw new RuntimeException("设备URL不能为空");
}
// 3. 连接OPC服务器
String username = deviceDO.getUsername() != null ? deviceDO.getUsername() : "";
String password = deviceDO.getPassword() != null ? deviceDO.getPassword() : "";
logger.error("执行设备任务异常设备ID: {}, 异常时间: {}", boolean connected = OpcUtils.connect(deviceDO.getUrl(), username, password, 10);
deviceId, errorTime, e); if (!connected) {
logger.error("连接OPC服务器失败设备ID: {}URL: {}", deviceId, deviceDO.getUrl());
throw new RuntimeException("连接OPC服务器失败");
}
// 4. 查询设备点位配置
LambdaQueryWrapper<DeviceContactModelDO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(DeviceContactModelDO::getDeviceId, deviceId);
List<DeviceContactModelDO> deviceContactModelDOS = deviceContactModelMapper.selectList(queryWrapper);
// 5. 判断是否有点位数据
if (deviceContactModelDOS == null || deviceContactModelDOS.isEmpty()) {
logger.warn("设备 {} 没有配置数据点位,跳过数据读取", deviceId);
return;
}
logger.info("设备 {} 共有 {} 个点位需要读取", deviceId, deviceContactModelDOS.size());
// 6. 读取OPC数据
int successCount = 0;
List<DeviceContactModelDO> validDataList = new ArrayList<>();
for (DeviceContactModelDO deviceContactModelDO : deviceContactModelDOS) {
try {
// 判断点位地址是否有效
String address = deviceContactModelDO.getAddress();
if (address == null || address.trim().isEmpty()) {
logger.warn("点位ID {} 的地址为空,跳过", deviceContactModelDO.getId());
continue;
}
// 读取OPC值
Object addressValue = OpcUtils.readValue(address);
if (addressValue == null) {
logger.warn("读取点位 {} 的值返回null地址: {}",
deviceContactModelDO.getId(), address);
} else {
// 值验证
if (addressValue instanceof String) {
String strValue = (String) addressValue;
if (strValue.trim().isEmpty()) {
logger.warn("读取点位 {} 的值为空字符串", deviceContactModelDO.getId());
deviceContactModelDO.setAddressValue("");
} else {
deviceContactModelDO.setAddressValue(addressValue);
successCount++;
}
} else {
deviceContactModelDO.setAddressValue(addressValue);
successCount++;
}
}
validDataList.add(deviceContactModelDO);
} catch (Exception e) {
logger.error("读取点位 {} 异常,地址: {}",
deviceContactModelDO.getId(),
deviceContactModelDO.getAddress(), e);
}
}
// 7. 判断是否有有效数据
if (validDataList.isEmpty()) {
logger.warn("设备 {} 没有读取到任何有效数据,跳过入库", deviceId);
return;
}
logger.info("设备 {} 成功读取 {} 个点位数据,总计 {} 个点位",
deviceId, successCount, validDataList.size());
try {
// 8. 数据入库
String json = JSON.toJSONString(validDataList);
boolean insertSuccess = tDengineService.insertDeviceData(deviceId, json);
if (insertSuccess) {
logger.info("设备 {} 数据成功插入TDengine数据量: {}", deviceId, validDataList.size());
} else {
logger.error("设备 {} 数据插入TDengine失败", deviceId);
}
} catch (Exception e) {
logger.error("设备 {} 数据入库异常", deviceId, e);
} finally {
// 9. 确保断开连接
try {
OpcUtils.disconnect();
} catch (Exception e) {
logger.error("断开OPC连接异常设备ID: {}", deviceId, e);
}
} }
} }
} }

@ -0,0 +1,49 @@
// DataSyncTask.java - 数据同步任务
package cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.coretask;
import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.core.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class GenerateWorkOrderTask implements Task {
private static final Logger logger = LoggerFactory.getLogger(GenerateWorkOrderTask.class);
@Override
public String getTaskType() {
return "WORK_ORDER";
}
@Override
public void execute(Long taskId, String taskParam) {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String currentTime = sdf.format(new Date());
logger.info("执行数据同步任务任务ID: {}, 参数: {}, 时间: {}",
taskId, taskParam, currentTime);
// 解析同步参数
// 格式示例: "source:target" 或 "table:condition"
syncData(taskParam);
} catch (Exception e) {
logger.error("执行数据同步任务异常任务ID: {}", taskId, e);
}
}
private void syncData(String param) {
// TODO: 数据同步逻辑
logger.info("开始同步数据,参数: {}", param);
// 示例:
// 1. 从源数据库读取数据
// 2. 转换数据格式
// 3. 写入目标数据库
// 4. 记录同步日志
}
}

@ -1,142 +1,230 @@
// TaskSchedulerManager.java // TaskSchedulerManager.java
package cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.scheduler; package cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.scheduler;
import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.coretask.DeviceTask; import cn.iocoder.yudao.module.iot.controller.admin.device.scheduled.core.Task;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger; import org.springframework.scheduling.support.CronTrigger;
import org.springframework.scheduling.support.PeriodicTrigger;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@Component @Component
public class TaskSchedulerManager { public class TaskSchedulerManager {
private static final Logger logger = LoggerFactory.getLogger(TaskSchedulerManager.class); private static final Logger logger = LoggerFactory.getLogger(TaskSchedulerManager.class);
private TaskScheduler taskScheduler; // 移除 static private final TaskScheduler taskScheduler;
private DeviceTask deviceTask; // 移除 static
private final Map<Long, ScheduledFuture<?>> taskMap = new ConcurrentHashMap<>(); // 存储所有任务
private final Map<String, Task> taskBeans = new ConcurrentHashMap<>();
// 存储正在运行的任务
private final Map<Long, ScheduledFuture<?>> runningTasks = new ConcurrentHashMap<>();
private final Map<Long, TaskInfo> taskInfos = new ConcurrentHashMap<>();
// 任务信息类
private static class TaskInfo {
private Long taskId;
private String taskType;
private String taskParam;
private String cronExpression;
private String scheduleType; // "CRON" 或 "FIXED_RATE" 或 "FIXED_DELAY"
private Long initialDelay;
private Long period;
private TimeUnit timeUnit;
// 构造器
public TaskInfo(Long taskId, String taskType, String taskParam) {
this.taskId = taskId;
this.taskType = taskType;
this.taskParam = taskParam;
}
// getters and setters...
}
// 通过构造器注入
public TaskSchedulerManager(TaskScheduler taskScheduler,
@Autowired(required = false) java.util.List<Task> tasks) {
this.taskScheduler = taskScheduler;
// 注册所有任务
if (tasks != null) {
for (Task task : tasks) {
taskBeans.put(task.getTaskType(), task);
logger.info("注册任务类型: {}", task.getTaskType());
}
}
}
/** /**
* TaskScheduler * Cron
* Spring TaskScheduler Bean
*/ */
@Autowired public boolean startCronTask(Long taskId, String taskType, String taskParam, String cronExpression) {
public TaskSchedulerManager(TaskScheduler taskScheduler) { return startTask(taskId, taskType, taskParam, "CRON", cronExpression, null, null, null);
this.taskScheduler = taskScheduler;
} }
/** /**
* DeviceTask *
*/ */
@Autowired public boolean startFixedRateTask(Long taskId, String taskType, String taskParam,
public void setDeviceTask(DeviceTask deviceTask) { long initialDelay, long period, TimeUnit timeUnit) {
this.deviceTask = deviceTask; return startTask(taskId, taskType, taskParam, "FIXED_RATE", null,
initialDelay, period, timeUnit);
} }
/** /**
* Spring TaskScheduler Bean *
*
*/ */
@PostConstruct public boolean startFixedDelayTask(Long taskId, String taskType, String taskParam,
public void init() { long initialDelay, long period, TimeUnit timeUnit) {
if (taskScheduler == null) { return startTask(taskId, taskType, taskParam, "FIXED_DELAY", null,
logger.warn("TaskScheduler not found in context, creating default one"); initialDelay, period, timeUnit);
ThreadPoolTaskScheduler defaultScheduler = new ThreadPoolTaskScheduler();
defaultScheduler.setPoolSize(10);
defaultScheduler.setThreadNamePrefix("device-task-");
defaultScheduler.initialize();
this.taskScheduler = defaultScheduler;
}
} }
/** /**
* *
* static
*/ */
public boolean startDeviceTask(Long deviceId, String deviceCode, String cronExpression) { private boolean startTask(Long taskId, String taskType, String taskParam,
String scheduleType, String cronExpression,
Long initialDelay, Long period, TimeUnit timeUnit) {
try { try {
// 先停止已存在的任务 // 停止已存在的任务
stopDeviceTask(deviceId); stopTask(taskId);
// 验证 DeviceTask // 获取任务实例
if (deviceTask == null) { Task task = taskBeans.get(taskType);
logger.error("DeviceTask is not initialized"); if (task == null) {
logger.error("任务类型不存在: {}", taskType);
return false; return false;
} }
// 创建新的定时任务 // 创建任务信息
ScheduledFuture<?> future = taskScheduler.schedule( TaskInfo taskInfo = new TaskInfo(taskId, taskType, taskParam);
() -> { taskInfo.scheduleType = scheduleType;
try { taskInfo.cronExpression = cronExpression;
deviceTask.executeDeviceLogic(deviceId, deviceCode); taskInfo.initialDelay = initialDelay;
} catch (Exception e) { taskInfo.period = period;
logger.error("Device task execution failed for deviceId: {}", deviceId, e); taskInfo.timeUnit = timeUnit;
} taskInfos.put(taskId, taskInfo);
},
new CronTrigger(cronExpression) // 根据调度类型创建任务
); ScheduledFuture<?> future = null;
switch (scheduleType) {
taskMap.put(deviceId, future); case "CRON":
logger.info("启动设备定时任务成功设备ID: {}, cron表达式: {}", deviceId, cronExpression); future = taskScheduler.schedule(
return true; () -> task.execute(taskId, taskParam),
new CronTrigger(cronExpression)
);
break;
case "FIXED_RATE":
if (initialDelay != null && period != null && timeUnit != null) {
future = taskScheduler.scheduleAtFixedRate(
() -> task.execute(taskId, taskParam),
initialDelay
);
}
break;
case "FIXED_DELAY":
if (initialDelay != null && period != null && timeUnit != null) {
future = taskScheduler.scheduleWithFixedDelay(
() -> task.execute(taskId, taskParam),
initialDelay
);
}
break;
}
if (future != null) {
runningTasks.put(taskId, future);
logger.info("启动{}任务成功任务ID: {}, 类型: {}, 参数: {}",
scheduleType, taskId, taskType, taskParam);
return true;
}
return false;
} catch (Exception e) { } catch (Exception e) {
logger.error("启动设备定时任务失败设备ID: {}", deviceId, e); logger.error("启动任务失败任务ID: {}", taskId, e);
return false; return false;
} }
} }
/** /**
* *
* static
*/ */
public boolean stopDeviceTask(Long deviceId) { public boolean stopTask(Long taskId) {
try { try {
ScheduledFuture<?> future = taskMap.get(deviceId); ScheduledFuture<?> future = runningTasks.get(taskId);
if (future != null) { if (future != null) {
future.cancel(true); future.cancel(true);
taskMap.remove(deviceId); runningTasks.remove(taskId);
logger.info("停止设备定时任务成功设备ID: {}", deviceId); taskInfos.remove(taskId);
logger.info("停止任务成功任务ID: {}", taskId);
return true; return true;
} else {
logger.info("设备定时任务不存在设备ID: {}", deviceId);
return false;
} }
return false;
} catch (Exception e) { } catch (Exception e) {
logger.error("停止设备定时任务失败设备ID: {}", deviceId, e); logger.error("停止任务失败任务ID: {}", taskId, e);
return false; return false;
} }
} }
/** /**
* *
*/ */
public boolean isTaskRunning(Long deviceId) { public TaskInfo getTaskInfo(Long taskId) {
ScheduledFuture<?> future = taskMap.get(deviceId); return taskInfos.get(taskId);
}
/**
*
*/
public boolean isTaskRunning(Long taskId) {
ScheduledFuture<?> future = runningTasks.get(taskId);
return future != null && !future.isCancelled() && !future.isDone(); return future != null && !future.isCancelled() && !future.isDone();
} }
/** /**
* *
*/ */
public int getTaskCount() { public Map<String, Task> getAllTaskTypes() {
return taskMap.size(); return new ConcurrentHashMap<>(taskBeans);
}
/**
*
*/
public int getRunningTaskCount() {
return runningTasks.size();
} }
/** /**
* *
*/ */
public void stopAllTasks() { public void stopAllTasks() {
logger.info("停止所有定时任务,共 {} 个", taskMap.size()); logger.info("停止所有定时任务,共 {} 个", runningTasks.size());
for (Long deviceId : taskMap.keySet()) { for (Long taskId : runningTasks.keySet()) {
stopDeviceTask(deviceId); stopTask(taskId);
} }
} }
/**
*
*/
public boolean startDeviceTask(Long deviceId, String deviceCode, String cronExpression) {
String taskParam = deviceId + ":" + deviceCode;
return startCronTask(deviceId, "DEVICE", taskParam, cronExpression);
}
} }

@ -406,23 +406,13 @@ public class DeviceServiceImpl implements DeviceService {
deviceDO.setStatus(String.valueOf(DeviceConnectionStatusEnum.CONNECTED.getStatus())); deviceDO.setStatus(String.valueOf(DeviceConnectionStatusEnum.CONNECTED.getStatus()));
deviceMapper.updateById(deviceDO); deviceMapper.updateById(deviceDO);
String cronExpression = CronExpressionUtils.secondsToCron(deviceDO.getSampleCycle());
//查询存储 taskSchedulerManager.startDeviceTask(
LambdaQueryWrapper<DeviceContactModelDO> deviceModelAttributeLambdaQueryWrapper = new LambdaQueryWrapper<>(); deviceDO.getId(),
deviceModelAttributeLambdaQueryWrapper.eq(DeviceContactModelDO::getDeviceId,createReqVO.getId()); deviceDO.getDeviceCode(),
List<DeviceContactModelDO> deviceContactModelDOS = deviceContactModelMapper.selectList(deviceModelAttributeLambdaQueryWrapper); cronExpression
);
if (deviceContactModelDOS != null && deviceContactModelDOS.size() > 0){
for (DeviceContactModelDO deviceContactModelDO : deviceContactModelDOS) {
Object addressValue = OpcUtils.readValue(deviceContactModelDO.getAddress() != null ? deviceContactModelDO.getAddress() : "");
deviceContactModelDO.setAddressValue(addressValue);
}
String json = JSON.toJSONString(deviceContactModelDOS);
tdengineService.insertDeviceData(createReqVO.getId(),json);
}
}else { }else {
@ -433,6 +423,7 @@ public class DeviceServiceImpl implements DeviceService {
if (disconnect){ if (disconnect){
deviceDO.setStatus(String.valueOf(DeviceConnectionStatusEnum.DISCONNECTED.getStatus())); deviceDO.setStatus(String.valueOf(DeviceConnectionStatusEnum.DISCONNECTED.getStatus()));
deviceMapper.updateById(deviceDO); deviceMapper.updateById(deviceDO);
taskSchedulerManager.stopTask(deviceDO.getId());
}else { }else {
throw exception(OPC_CLOSE_CONNECT_FAILURE); throw exception(OPC_CLOSE_CONNECT_FAILURE);
} }
@ -723,7 +714,7 @@ public class DeviceServiceImpl implements DeviceService {
public Boolean scheduledStop(Long id) { public Boolean scheduledStop(Long id) {
try { try {
// 1. 停止定时任务 // 1. 停止定时任务
taskSchedulerManager.stopDeviceTask(id); taskSchedulerManager.stopTask(id);
return true; return true;

Loading…
Cancel
Save