|
@@ -22,6 +22,7 @@ import io.vertx.core.Future;
|
|
|
import io.vertx.core.buffer.Buffer;
|
|
|
import io.vertx.core.net.PemKeyCertOptions;
|
|
|
import io.vertx.mqtt.*;
|
|
|
+import io.vertx.mqtt.messages.codes.MqttDisconnectReasonCode;
|
|
|
import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.codec.binary.Hex;
|
|
@@ -114,7 +115,6 @@ public class NBVerticle extends AbstractVerticle {
|
|
|
//删除设备与连接关系
|
|
|
endpointMap.remove(getEndpointKey(r));
|
|
|
});
|
|
|
- endpoint.close();
|
|
|
}).disconnectMessageHandler(disconnectMessage -> {
|
|
|
log.info("Received disconnect from client, reason code = {}", disconnectMessage.code());
|
|
|
executor.onReceive(new HashMap<>(), "disconnect", clientId, (r) -> {
|
|
@@ -159,6 +159,17 @@ public class NBVerticle extends AbstractVerticle {
|
|
|
if (StringUtils.isBlank(payload)) {
|
|
|
return;
|
|
|
}
|
|
|
+ if(Boolean.FALSE.equals(mqttConnectPool.get(clientId))){
|
|
|
+ executor.onReceive(null, "online", clientId);
|
|
|
+ //保存设备与连接关系
|
|
|
+ String productKey = getProductKey(clientId);
|
|
|
+ String deviceName = getDeviceName(clientId);
|
|
|
+ endpointMap.put(getEndpointKey(productKey,deviceName ), endpoint);
|
|
|
+ mqttConnectPool.put(clientId, true);
|
|
|
+ log.info("mqtt client reconnect success,clientId:{}",clientId);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
|
|
|
try {
|
|
|
Map<String, Object> head = new HashMap<>();
|
|
@@ -187,6 +198,16 @@ public class NBVerticle extends AbstractVerticle {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ private String getDeviceName(String clientId) {
|
|
|
+ String[] s = clientId.split("_");
|
|
|
+ return s[0];
|
|
|
+ }
|
|
|
+
|
|
|
+ private String getProductKey(String clientId) {
|
|
|
+ String[] s = clientId.split("_");
|
|
|
+ return s[1];
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public void stop() throws Exception {
|
|
|
for (MqttEndpoint endpoint : endpointMap.values()) {
|