浏览代码

修复设备上下线状态消息重复问题

15173476162 2 年之前
父节点
当前提交
a25cb06dd7

+ 2 - 0
iot-components/iot-component-server/src/main/java/cc/iotkit/comps/DeviceMessageHandler.java

@@ -199,6 +199,8 @@ public class DeviceMessageHandler implements IMessageHandler {
             } else {
                 deviceRouter.removeRouter(pk, dn);
             }
+            // 避免已在线多此发送上线消息
+            if (isOnline == deviceBehaviourService.isOnline(pk, dn)) return;
             component.onDeviceStateChange(state);
             deviceBehaviourService.deviceStateChange(pk, dn, isOnline);
         } catch (Throwable e) {

+ 7 - 0
iot-components/iot-component-server/src/main/java/cc/iotkit/comps/service/DeviceBehaviourService.java

@@ -175,6 +175,13 @@ public class DeviceBehaviourService {
 
     }
 
+    public boolean isOnline(String productKey,
+                            String deviceName){
+        DeviceInfo device = deviceInfoData.findByProductKeyAndDeviceName(productKey, deviceName);
+        DeviceInfo deviceInfo = deviceInfoData.findByDeviceId(device.getDeviceId());
+        return deviceInfo.getState().isOnline();
+    }
+
     public void deviceStateChange(String productKey,
                                   String deviceName,
                                   boolean online) {

+ 7 - 0
iot-components/iot-mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttVerticle.java

@@ -29,6 +29,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 @Slf4j
 public class MqttVerticle extends AbstractVerticle {
@@ -41,6 +42,9 @@ public class MqttVerticle extends AbstractVerticle {
 
     private final Map<String, MqttEndpoint> endpointMap = new HashMap<>();
 
+    // 增加一个客户端连接clientid-连接状态池,避免mqtt关闭的时候走异常断开和mqtt断开的handler,导致多次离线消息
+    private static final Map<String, Boolean> mqttConnectPool = new ConcurrentHashMap<>();
+
     public MqttVerticle(MqttConfig config) {
         this.config = config;
     }
@@ -84,6 +88,7 @@ public class MqttVerticle extends AbstractVerticle {
                     }
                     //保存设备与连接关系
                     endpointMap.put(getEndpointKey(r), endpoint);
+                    mqttConnectPool.put(clientId, true);
                 });
             } catch (Throwable e) {
                 log.error("auth failed", e);
@@ -96,6 +101,7 @@ public class MqttVerticle extends AbstractVerticle {
             endpoint.accept(false);
             endpoint.closeHandler((v) -> {
                 log.warn("client connection closed,clientId:{}", clientId);
+                if (mqttConnectPool.get(clientId) == false) return;
                 executor.onReceive(new HashMap<>(), "disconnect", clientId, (r) -> {
                     //删除设备与连接关系
                     endpointMap.remove(getEndpointKey(r));
@@ -105,6 +111,7 @@ public class MqttVerticle extends AbstractVerticle {
                 executor.onReceive(new HashMap<>(), "disconnect", clientId, (r) -> {
                     //删除设备与连接关系
                     endpointMap.remove(getEndpointKey(r));
+                    mqttConnectPool.put(clientId, false);
                 });
             }).subscribeHandler(subscribe -> {
                 List<MqttSubAckReasonCode> reasonCodes = new ArrayList<>();