Răsfoiți Sursa

feate: 数据上报自动设备离线检测

jackzhou 5 luni în urmă
părinte
comite
c775c8c3fc

+ 94 - 36
snowy-plugin/snowy-plugin-coldchain/src/main/java/vip/xiaonuo/coldchain/core/alarm/offline/DeviceOfflineDetectionService.java

@@ -20,9 +20,7 @@ import vip.xiaonuo.common.cache.CommonCacheOperator;
 
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
+import java.util.*;
 import java.util.stream.Collectors;
 
 /**
@@ -67,52 +65,112 @@ public class DeviceOfflineDetectionService {
      * 定期检测设备状态是否离线
      * 每 5 分钟运行一次
      */
-    @Scheduled(fixedRate = 300000) // 每 5 分钟(以毫秒为单位)
-//    @Scheduled(fixedRateString = "${coldchain.device-offline-interval:300000}")
+//    @Scheduled(fixedRate = 300000) // 每 5 分钟(以毫秒为单位)
+////    @Scheduled(fixedRateString = "${coldchain.device-offline-interval:300000}")
+//    public void checkDeviceStatus() {
+//        log.info("开始自动检测设备在线状态...");
+//        // 获取 Redis 中所有设备的键
+//        Set<String> deviceKeys = getAllDeviceCodes();
+//        Map<String,Integer> deviceCodes = new LinkedHashMap<>();
+//        if(deviceKeys!=null && !deviceKeys.isEmpty()){
+//            // 遍历设备键,逐一检测状态
+//            for (String key : deviceKeys) {
+//                String[] split = key.split(KEY_SPILT);
+//                if (split.length == 0) {
+//                    continue;
+//                }
+//                String deviceCode = split[0];
+//                Integer route = null;
+//                if (split.length == 2) {
+//                    route = Integer.valueOf(split[1]);
+//                }
+//                deviceCodes.put(deviceCode,route);
+//                // 从 Redis 中获取设备的最后上报时间
+//                String lastReportTimeStr = getLastDeviceTime(getKey(deviceCode, route));
+//                if (lastReportTimeStr != null) {
+//                    // 将上报时间字符串解析为 LocalDateTime 对象
+//                    LocalDateTime lastReportTime = LocalDateTime.parse(lastReportTimeStr, DATE_TIME_FORMATTER);
+//                    // 获取当前时间
+//                    LocalDateTime now = LocalDateTime.now();
+//                    final long OFFLINE_THRESHOLD_MINUTES = jfcloudColdChainServerProperties.getDeviceOfflineInterval() / 60 / 1000;
+//                    // 如果最后上报时间超过离线阈值,判断设备为离线
+//                    if (lastReportTime.plusMinutes(OFFLINE_THRESHOLD_MINUTES).isBefore(now)) {
+//                        log.error("设备{}-{} 已离线,最后上报时间:{}。\n", deviceCode, route, lastReportTimeStr);
+//                        publishAlarm(deviceCode, route, lastReportTimeStr);
+//                        monitorTargetService.updateStatusByDeviceCode(deviceCode, route, MonitorStatusEnum.OFF);
+//                    } else {
+//                        log.info("设备 {}-{} 在线,最后上报时间:{}。\n", deviceCode, route, lastReportTimeStr);
+//                        monitorTargetService.updateStatusByDeviceCode(deviceCode, route, MonitorStatusEnum.ONLINE);
+//                    }
+//                } else {
+//                    monitorTargetService.updateStatusByDeviceCode(deviceCode, route, MonitorStatusEnum.OFF);
+//                    log.info("设备 {}-{} 缺少数据,可能从未上报过。\n", deviceCode, route);
+//                }
+//            }
+//        }
+//        log.info("查找出从来没上线的设备都统一都下线处理");
+//        monitorTargetService.updateStatusbatch(deviceCodes);
+//    }
+    @Scheduled(fixedRate = 300000) // 每 5 分钟运行一次
     public void checkDeviceStatus() {
-        log.info("开始检测离线设备状态...");
-        // 获取 Redis 中所有设备的键
+        log.info("开始自动检测设备在线状态...");
+        Map<String, Integer> deviceCodes = new LinkedHashMap<>();
         Set<String> deviceKeys = getAllDeviceCodes();
-        if (deviceKeys == null || deviceKeys.isEmpty()) {
-            log.info("没有需要检测的设备。");
+        if(deviceKeys!=null && deviceKeys.size()>0){
+            LocalDateTime now = LocalDateTime.now();
+            final long OFFLINE_THRESHOLD_MINUTES = jfcloudColdChainServerProperties.getDeviceOfflineInterval() / 60 / 1000;
+            for (String key : deviceKeys) {
+                processDevice(key, now, OFFLINE_THRESHOLD_MINUTES, deviceCodes);
+            }
+        }
+        log.info("查找出从来没上线的设备,统一设置为离线...");
+        monitorTargetService.updateStatusbatch(deviceCodes);
+    }
+
+    /**
+     * 处理设备状态
+     * @param key
+     * @param now
+     * @param offlineThresholdMinutes
+     * @param deviceCodes
+     */
+    private void processDevice(String key, LocalDateTime now, long offlineThresholdMinutes, Map<String, Integer> deviceCodes) {
+        String[] split = key.split(KEY_SPILT);
+        if (split.length == 0) {
             return;
         }
-        // 遍历设备键,逐一检测状态
-        for (String key : deviceKeys) {
-            String[] split = key.split(KEY_SPILT);
-            if (split.length == 0) {
-                continue;
-            }
-            String deviceCode = split[0];
-            Integer route = null;
+        String deviceCode = split[0];
+        Integer route = null;
+        try {
             if (split.length == 2) {
                 route = Integer.valueOf(split[1]);
             }
-            // 从 Redis 中获取设备的最后上报时间
-            String lastReportTimeStr = getLastDeviceTime(getKey(deviceCode, route));
-            if (lastReportTimeStr != null) {
-                // 将上报时间字符串解析为 LocalDateTime 对象
-                LocalDateTime lastReportTime = LocalDateTime.parse(lastReportTimeStr, DATE_TIME_FORMATTER);
-                // 获取当前时间
-                LocalDateTime now = LocalDateTime.now();
-                final long OFFLINE_THRESHOLD_MINUTES = jfcloudColdChainServerProperties.getDeviceOfflineInterval() / 60 / 1000;
-                // 如果最后上报时间超过离线阈值,判断设备为离线
-                if (lastReportTime.plusMinutes(OFFLINE_THRESHOLD_MINUTES).isBefore(now)) {
-                    log.error("设备{}-{} 已离线,最后上报时间:{}。\n", deviceCode, route, lastReportTimeStr);
-                    publishAlarm(deviceCode, route, lastReportTimeStr);
-                    monitorTargetService.updateStatusByDeviceCode(deviceCode, route, MonitorStatusEnum.OFF);
-                } else {
-                    log.info("设备 {}-{} 在线,最后上报时间:{}。\n", deviceCode, route, lastReportTimeStr);
-                    monitorTargetService.updateStatusByDeviceCode(deviceCode, route, MonitorStatusEnum.ONLINE);
-                }
+        } catch (NumberFormatException e) {
+            log.error("设备键 {} 中的 route 数据格式错误,跳过处理。", key, e);
+            return;
+        }
+        deviceCodes.put(deviceCode, route);
+        String lastReportTimeStr = getLastDeviceTime(getKey(deviceCode, route));
+        MonitorStatusEnum status;
+        if (lastReportTimeStr != null) {
+            LocalDateTime lastReportTime = LocalDateTime.parse(lastReportTimeStr, DATE_TIME_FORMATTER);
+            if (lastReportTime.plusMinutes(offlineThresholdMinutes).isBefore(now)) {
+                status = MonitorStatusEnum.OFF;
+                log.warn("设备 {}-{} 已离线,最后上报时间:{}。", deviceCode, route, lastReportTimeStr);
+                publishAlarm(deviceCode, route, lastReportTimeStr);
             } else {
-                monitorTargetService.updateStatusByDeviceCode(deviceCode, route, MonitorStatusEnum.OFF);
-                log.info("设备 {}-{} 缺少数据,可能从未上报过。\n", deviceCode, route);
+                status = MonitorStatusEnum.ONLINE;
+                log.debug("设备 {}-{} 在线,最后上报时间:{}。", deviceCode, route, lastReportTimeStr);
             }
+        } else {
+            status = MonitorStatusEnum.OFF;
+            log.info("设备 {}-{} 缺少数据,可能从未上报过。", deviceCode, route);
         }
+        monitorTargetService.updateStatusByDeviceCode(deviceCode, route, status);
     }
 
 
+
     private void publishAlarm(String deviceCode, Integer route, String time) {
         MonitorTargetRegion monitorTargetRegion = monitorTargetRegionService.findOneByDeviceCodeAndSensorNo(deviceCode, route);
         if (Objects.isNull(monitorTargetRegion)) {

+ 3 - 0
snowy-plugin/snowy-plugin-coldchain/src/main/java/vip/xiaonuo/coldchain/modular/monitortarget/service/MonitorTargetService.java

@@ -21,6 +21,7 @@ import vip.xiaonuo.coldchain.modular.targetroom.entity.TargetRoom;
 
 import java.util.Date;
 import java.util.List;
+import java.util.Map;
 
 /**
  * 监控对象管理Service接口
@@ -138,4 +139,6 @@ public interface MonitorTargetService extends IService<MonitorTarget> {
     void editWithRoom(MonitorTargetEditWithRoomParam monitorTargetEditWithRoomParam);
 
     TargetRoom getRoomByMonitorTargetId(String monitorTargetId);
+
+    void updateStatusbatch(Map<String,Integer> deviceCodes);
 }

+ 40 - 7
snowy-plugin/snowy-plugin-coldchain/src/main/java/vip/xiaonuo/coldchain/modular/monitortarget/service/impl/MonitorTargetServiceImpl.java

@@ -19,9 +19,11 @@ import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.core.util.StrUtil;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 import vip.xiaonuo.auth.core.pojo.SaBaseLoginUser;
@@ -42,10 +44,8 @@ import vip.xiaonuo.common.enums.CommonSortOrderEnum;
 import vip.xiaonuo.common.exception.CommonException;
 import vip.xiaonuo.common.page.CommonPageRequest;
 
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Objects;
+import java.util.*;
+import java.util.stream.Collectors;
 
 /**
  * 监控对象管理Service接口实现类
@@ -54,8 +54,8 @@ import java.util.Objects;
  * @date 2024/11/13 16:56
  **/
 @Service
+@Slf4j
 public class MonitorTargetServiceImpl extends ServiceImpl<MonitorTargetMapper, MonitorTarget> implements MonitorTargetService {
-
     @Resource
     private MonitorTargetRegionService monitorTargetRegionService;
     @Resource
@@ -147,9 +147,9 @@ public class MonitorTargetServiceImpl extends ServiceImpl<MonitorTargetMapper, M
 
     @Override
     public TargetRoom getRoomByMonitorTargetId(String monitorTargetId) {
-        if(Objects.nonNull(monitorTargetId)){
+        if (Objects.nonNull(monitorTargetId)) {
             MonitorTarget monitorTarget = this.getById(monitorTargetId);
-            if(Objects.nonNull(monitorTarget)){
+            if (Objects.nonNull(monitorTarget)) {
                 String roomId = monitorTarget.getRoomId();
                 return targetRoomService.getById(roomId);
             }
@@ -157,6 +157,39 @@ public class MonitorTargetServiceImpl extends ServiceImpl<MonitorTargetMapper, M
         return null;
     }
 
+    @Override
+    public void updateStatusbatch(Map<String, Integer> notInPairs) {
+        // 构建查询条件
+        LambdaQueryWrapper<MonitorTargetRegion> queryWrapper = new LambdaQueryWrapper<>();
+        // 将每个 (deviceCode, sensorRoute) 组合转换为 "deviceCode:sensorRoute" 格式的字符串
+        List<String> notInConditions = notInPairs.entrySet().stream()
+                .map(entry -> entry.getKey() + ":" + entry.getValue())
+                .collect(Collectors.toList());
+        queryWrapper.notIn(MonitorTargetRegion::getDeviceCodeAndSensorRoute, notInConditions);
+        // 查询不符合条件的 MonitorTargetRegion
+        List<MonitorTargetRegion> targetRegions = monitorTargetRegionService.list(queryWrapper);
+        // 使用 stream 过滤出 deviceCode 不为空的 monitorTargetId 列表
+        List<String> monitorTargetIds = targetRegions.stream()
+                .filter(monitorTargetRegion -> monitorTargetRegion.getDeviceCode() != null)
+                .map(MonitorTargetRegion::getMonitorTargetId) // 提取 monitorTargetId
+                .toList();
+        // 批量更新状态
+        if (!monitorTargetIds.isEmpty()) {
+            LambdaUpdateWrapper<MonitorTarget> updateWrapper = new LambdaUpdateWrapper<>();
+            updateWrapper.in(MonitorTarget::getId, monitorTargetIds); // 条件:monitorTargetId 在列表中
+            updateWrapper.set(MonitorTarget::getStatus, MonitorStatusEnum.OFF.getCode()); // 设置状态为 OFF
+            // 执行批量更新
+            boolean updateResult = this.update(updateWrapper);
+            if (updateResult) {
+                log.info("批量更新成功,已将 {} 条记录状态修改为 OFF", monitorTargetIds.size());
+            } else {
+                log.warn("批量更新失败,无法更新状态为 OFF 的记录");
+            }
+        } else {
+            log.info("未找到符合条件的 monitorTargetId,无需更新");
+        }
+    }
+
 
     @Transactional(rollbackFor = Exception.class)
     @Override

+ 6 - 0
snowy-plugin/snowy-plugin-coldchain/src/main/java/vip/xiaonuo/coldchain/modular/monitortargetregion/entity/MonitorTargetRegion.java

@@ -225,4 +225,10 @@ public class MonitorTargetRegion extends OrgEntity {
     public void setCo2Down(Float co2Down) {
         this.co2Down = co2Down;
     }
+
+
+    public String getDeviceCodeAndSensorRoute() {
+        return this.deviceCode + ":" + this.sensorRoute;
+    }
+
 }