Quellcode durchsuchen

fix:修复上线预警失效

黄渊昊 vor 1 Woche
Ursprung
Commit
c63088082b

+ 15 - 0
snowy-plugin/snowy-plugin-coldchain/src/main/java/vip/xiaonuo/coldchain/core/alarm/mapper/SensorAlarmMapper.java

@@ -3,6 +3,7 @@ package vip.xiaonuo.coldchain.core.alarm.mapper;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 import com.baomidou.mybatisplus.core.toolkit.Constants;
+import org.apache.ibatis.annotations.Delete;
 import org.apache.ibatis.annotations.Param;
 import org.apache.ibatis.annotations.ResultType;
 import org.apache.ibatis.annotations.Select;
@@ -77,4 +78,18 @@ public interface SensorAlarmMapper extends BaseMapper<SensorAlarm> {
                             @Param("sensorRoute") Integer sensorRoute,
                             @Param("types") List<String> types,
                             @Param("time") Date time);
+
+    /**
+     * 根据时间范围获取告警数据
+     */
+    @Select("SELECT *\n" +
+            "from sensor_alarm\n" +
+            "WHERE CREATE_TIME <= #{time}")
+    List<SensorAlarm> getAlarmByTime(@Param("time") Date time);
+
+    /**
+     * 删除已归档的数据
+     */
+    @Delete("DELETE FROM sensor_alarm WHERE CREATE_TIME <= #{time}")
+    void deleteByTime(@Param("time") Date time);
 }

+ 16 - 7
snowy-plugin/snowy-plugin-coldchain/src/main/java/vip/xiaonuo/coldchain/core/alarm/service/messagepush/RedisSensorAlarmMessagePushService.java

@@ -44,10 +44,19 @@ public class RedisSensorAlarmMessagePushService {
     public void pushAlarmMessage(SensorAlarm alarm) {
         String deviceID = alarm.getDeviceId();
         String alarmType = alarm.getAlarmType();
+
+        // 在循环前获取并保存 firstAlarmTime
+        Long firstAlarmTimeForRecovery = 0L;
+        Long firstAlarmTimeForOnline = 0L;
+
         if (alarm.getType().equals("0") || alarm.getType().equals("1")) {
-//        将第一次预警时间存入redis
             recordAlarmTime(deviceID, alarm.getSensorRoute(), alarm.getType());
+        } else if (alarm.getType().equals("2")) {
+            firstAlarmTimeForRecovery = (Long) getAlarmTime(deviceID, alarm.getSensorRoute(), "0");
+        } else if (alarm.getType().equals("4")) {
+            firstAlarmTimeForOnline = (Long) getAlarmTime(deviceID, alarm.getSensorRoute(), "1");
         }
+
         Map<String, SensorAlarmUser> users = Optional.ofNullable(alarm.getAlarmUsers()).orElse(Collections.emptyList()) // 如果为 null 则返回空集合
                 .stream().filter(Objects::nonNull) // 过滤掉 null 的用户
                 .filter(user -> StrUtil.isNotBlank(user.getOpenId())) // 过滤掉 openId 为空的用户
@@ -97,7 +106,7 @@ public class RedisSensorAlarmMessagePushService {
                 if (pushService != null) {
                     try {
                         boolean b = false;
-                        Long firstAlarmTime;
+//                        Long firstAlarmTime;
                         switch (alarm.getType()) {
                             case "0":
                                 log.info("发送设备 {} 第 {} 路 {} 告警消息", deviceID, alarm.getSensorRoute(), alarm.getAlarmType());
@@ -107,15 +116,15 @@ public class RedisSensorAlarmMessagePushService {
                                 b = pushService.sendOfflineMessage(alarm);
                                 break;
                             case "2":
-                                firstAlarmTime = (Long) getAlarmTime(deviceID, alarm.getSensorRoute(), "0");
-                                b = pushService.sendAlarmRecoverMessage(alarm, firstAlarmTime);
+//                                firstAlarmTime = (Long) getAlarmTime(deviceID, alarm.getSensorRoute(), "0");
+                                b = pushService.sendAlarmRecoverMessage(alarm, firstAlarmTimeForRecovery);
                                 if (b) {
                                     delAlarmTime(deviceID, alarm.getSensorRoute(), "0");
                                 }
                                 break;
                             case "4":
-                                firstAlarmTime = (Long) getAlarmTime(deviceID, alarm.getSensorRoute(), "1");
-                                b = pushService.sendOnlineMessage(alarm, firstAlarmTime);
+//                                firstAlarmTime = (Long) getAlarmTime(deviceID, alarm.getSensorRoute(), "1");
+                                b = pushService.sendOnlineMessage(alarm, firstAlarmTimeForOnline);
                                 if (b) {
                                     delAlarmTime(deviceID, alarm.getSensorRoute(), "1");
                                 }
@@ -182,7 +191,7 @@ public class RedisSensorAlarmMessagePushService {
             long currentTime = System.currentTimeMillis();
 //            pushTimes.removeIf(time -> currentTime - (Long) time > JfcloudColdChainConstants.MESS_PUSH_TIME_WINDOW);
             pushTimes.removeIf(time -> currentTime - (Long) time > 3 * 60 * 1000);
-            return pushTimes.size() >= JfcloudColdChainConstants.MESS_PUSH_MAX_PUSH_COUNT;
+            return pushTimes.size() <= JfcloudColdChainConstants.MESS_PUSH_MAX_PUSH_COUNT;
         }
         return false;
     }

+ 187 - 41
snowy-plugin/snowy-plugin-coldchain/src/main/java/vip/xiaonuo/coldchain/core/alarm/websocket/WebSocketEndpoint.java

@@ -1,11 +1,7 @@
 package vip.xiaonuo.coldchain.core.alarm.websocket;
 
 import com.google.gson.JsonObject;
-import jakarta.annotation.PostConstruct;
-import jakarta.websocket.OnClose;
-import jakarta.websocket.OnMessage;
-import jakarta.websocket.OnOpen;
-import jakarta.websocket.Session;
+import jakarta.websocket.*;
 import jakarta.websocket.server.ServerEndpoint;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
@@ -16,75 +12,225 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 @Component
 @ServerEndpoint("/ws/{type}")
 @Slf4j
 public class WebSocketEndpoint {
-    private static final Map<String, Session> sessions = new ConcurrentHashMap<>(); // 管理所有Session
+    private static final Map<String, Session> sessions = new ConcurrentHashMap<>();
+    private static final Map<String, Long> lastActiveTime = new ConcurrentHashMap<>();
 
+    // 心跳配置
     private static final long HEARTBEAT_TIMEOUT = 60 * 1000; // 心跳超时时间(60秒)
 
-    // 记录每个会话的最后活跃时间
-    private static final ConcurrentHashMap<String, Long> lastActiveTime = new ConcurrentHashMap<>();
+    // 定时任务
+    private static ScheduledExecutorService heartbeatScheduler;
+    private static final AtomicBoolean schedulerStarted = new AtomicBoolean(false);
 
     // 连接建立时触发
     @OnOpen
     public void onOpen(Session session) {
-        sessions.put(session.getId(), session);
-        lastActiveTime.put(session.getId(), System.currentTimeMillis());
-        log.info("客户端连接: {}", session.getId());
+        String sessionId = session.getId();
+
+        // 设置会话参数
+        session.setMaxIdleTimeout(300000L); // 5分钟空闲超时
+        session.setMaxTextMessageBufferSize(1024 * 1024); // 1MB缓冲区
+
+        sessions.put(sessionId, session);
+        lastActiveTime.put(sessionId, System.currentTimeMillis());
+
+        log.info("客户端连接: {}, 当前连接数: {}", sessionId, sessions.size());
+
+        // 启动心跳检查(确保只启动一次)
+        startHeartbeatScheduler();
+
+        // 发送连接成功消息
+        sendConnectionSuccess(session);
+    }
+
+    // 发送连接成功消息
+    private void sendConnectionSuccess(Session session) {
+        try {
+            JsonObject jsonObject = new JsonObject();
+            jsonObject.addProperty("msg", "CONNECTED");
+            session.getBasicRemote().sendText(jsonObject.toString());
+        } catch (IOException e) {
+            log.error("发送连接成功消息失败: {}", session.getId(), e);
+        }
     }
 
     // 收到客户端消息时触发
     @OnMessage
     public void onMessage(String message, Session session) {
+        String sessionId = session.getId();
+
+        // 清理空格
         message = message.replaceAll("\\s+", "");
-        if ("HEARTBEAT".equals(message)) {
-            lastActiveTime.put(session.getId(), System.currentTimeMillis());
-            sendMessage("PONG"); // 返回心跳响应[6,8](@ref)
+
+        if ("\"HEARTBEAT\"".equals(message)) {
+            // 更新最后活跃时间
+            lastActiveTime.put(sessionId, System.currentTimeMillis());
+            log.debug("收到心跳: {}", sessionId);
+
+            // 响应心跳(保持原有格式)
+            sendHeartbeatResponse(session);
+        } else {
+            // 处理业务消息
+            lastActiveTime.put(sessionId, System.currentTimeMillis());
+            log.debug("收到业务消息: {}, {}", sessionId, message);
+
+        }
+    }
+
+    // 发送心跳响应
+    private void sendHeartbeatResponse(Session session) {
+        try {
+            JsonObject jsonObject = new JsonObject();
+            jsonObject.addProperty("msg", "PONG");
+            session.getBasicRemote().sendText(jsonObject.toString());
+        } catch (IOException e) {
+            log.error("发送心跳响应失败: {}", session.getId(), e);
         }
     }
 
     // 连接关闭时触发
     @OnClose
     public void onClose(Session session) {
-        sessions.remove(session.getId());
-        lastActiveTime.remove(session.getId());
-        log.info("客户端断开: {}", session.getId());
+        String sessionId = session.getId();
+
+        sessions.remove(sessionId);
+        lastActiveTime.remove(sessionId);
+
+        log.info("客户端断开: {}, 当前连接数: {}", sessionId, sessions.size());
+    }
+
+    // 连接错误时触发
+    @OnError
+    public void onError(Session session, Throwable error) {
+        String sessionId = session.getId();
+
+        log.error("WebSocket错误: {}", sessionId, error);
+
+        // 关闭错误连接
+        if (session.isOpen()) {
+            try {
+                session.close();
+            } catch (IOException e) {
+                log.error("关闭错误连接失败: {}", sessionId, e);
+            }
+        }
+
+        // 清理资源
+        sessions.remove(sessionId);
+        lastActiveTime.remove(sessionId);
+    }
+
+    // 启动心跳调度器
+    private synchronized void startHeartbeatScheduler() {
+        if (schedulerStarted.compareAndSet(false, true)) {
+            heartbeatScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
+                Thread thread = new Thread(r, "websocket-heartbeat");
+                thread.setDaemon(true);
+                return thread;
+            });
+
+            // 每30秒检查一次心跳超时
+            heartbeatScheduler.scheduleAtFixedRate(() -> {
+                try {
+                    checkHeartbeatTimeout();
+                } catch (Exception e) {
+                    log.error("心跳检查异常", e);
+                }
+            }, 0, 30, TimeUnit.SECONDS);
+
+            log.info("WebSocket心跳检查已启动");
+        }
+    }
+
+    // 检查心跳超时
+    private void checkHeartbeatTimeout() {
+        long currentTime = System.currentTimeMillis();
+
+        sessions.forEach((sessionId, session) -> {
+            Long lastTime = lastActiveTime.get(sessionId);
+            if (lastTime != null && (currentTime - lastTime) > HEARTBEAT_TIMEOUT) {
+                log.warn("心跳超时,关闭连接: {}, 最后活跃: {}ms前",
+                        sessionId, currentTime - lastTime);
+
+                // 关闭超时连接
+                closeSessionQuietly(session);
+
+                // 清理资源
+                sessions.remove(sessionId);
+                lastActiveTime.remove(sessionId);
+            }
+        });
     }
 
-    // 发送消息给客户端
+    // 安静地关闭会话
+    private void closeSessionQuietly(Session session) {
+        if (session != null && session.isOpen()) {
+            try {
+                session.close();
+            } catch (IOException e) {
+                // 忽略关闭异常
+            }
+        }
+    }
+
+    // 发送消息给所有客户端
     public static void sendMessage(String message) {
         sessions.forEach((id, session) -> {
-            JsonObject jsonObject = new JsonObject();
-            jsonObject.addProperty("msg", message);
+            if (session.isOpen()) {
+                try {
+                    JsonObject jsonObject = new JsonObject();
+                    jsonObject.addProperty("msg", message);
+                    session.getBasicRemote().sendText(jsonObject.toString());
+                } catch (IOException e) {
+                    log.error("发送消息失败: {}", id, e);
+                }
+            }
+        });
+    }
+
+    // 发送消息给特定客户端
+    public static boolean sendMessageToSession(String sessionId, String message) {
+        Session session = sessions.get(sessionId);
+        if (session != null && session.isOpen()) {
             try {
+                JsonObject jsonObject = new JsonObject();
+                jsonObject.addProperty("msg", message);
                 session.getBasicRemote().sendText(jsonObject.toString());
+                return true;
             } catch (IOException e) {
-                throw new RuntimeException(e);
+                log.error("发送消息到指定会话失败: {}", sessionId, e);
             }
-        });
+        }
+        return false;
     }
 
-    // 向所有客户端发送刷新指令(静态方法供Service调用)
-    @PostConstruct
-    public void initHeartbeatCheck() {
-        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
-        scheduler.scheduleAtFixedRate(() -> {
-            long currentTime = System.currentTimeMillis();
-            lastActiveTime.forEach((sessionId, lastTime) -> {
-                if (currentTime - lastTime > HEARTBEAT_TIMEOUT) {
-                    Session session = sessions.get(sessionId);
-                    if (session != null) {
-                        try {
-                            session.close(); // 关闭超时连接[6,8](@ref)
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                }
-            });
-        }, 0, 30, TimeUnit.SECONDS); // 每30秒检查一次
+    // Spring容器关闭时清理资源
+    @jakarta.annotation.PreDestroy
+    public void destroy() {
+        log.info("清理WebSocket资源...");
+
+        // 关闭所有连接
+        sessions.forEach((id, session) -> {
+            closeSessionQuietly(session);
+        });
+
+        // 清理Map
+        sessions.clear();
+        lastActiveTime.clear();
+
+        // 关闭调度器
+        if (heartbeatScheduler != null) {
+            heartbeatScheduler.shutdownNow();
+        }
+
+        schedulerStarted.set(false);
+
+        log.info("WebSocket资源清理完成");
     }
 }