소스 검색

emqx-component启动优化

xiwa 2 년 전
부모
커밋
5d106c4e4d

+ 7 - 4
data/components/6c095554-35e7-4e9d-a8d2-bb919e9479f4/component.js

@@ -60,6 +60,12 @@ function acl(head,type,payload){
         };
     }
 
+    // 客户端订阅处理
+    if (/^\/sys\/.+\/.+\/c\/#/i.test(_topic)) {
+	  return subscribe(head,type,payload);
+    }
+
+    // 服务端订阅处理
     if (/^\/sys\/.+\/.+\/s\/.*/i.test(_topic)) {
 	  return subscribe(head,type,payload);
     }
@@ -242,12 +248,9 @@ this.onReceive=function(head,type,payload){
 
     var fun = messageHandler[topic];
 
-
-
     if(fun){
         result = fun(head,type,payload)
-    }
-    else{
+    }else{
         var arr= topic.split('/');
         if(arr.length<6){
             throw new Error("incorrect topic: "+topic)

BIN
data/components/6c095554-35e7-4e9d-a8d2-bb919e9479f4/iot-emqx-component-0.4.3-SNAPSHOT.jar


+ 2 - 0
iot-components/iot-component-server/src/main/java/cc/iotkit/comps/DeviceMessageHandler.java

@@ -70,11 +70,13 @@ public class DeviceMessageHandler implements IMessageHandler {
         scriptEngine.setScript(script);
     }
 
+    @Override
     public void onReceive(Map<String, Object> head, String type, String msg) {
         onReceive(head, type, msg, (r) -> {
         });
     }
 
+    @Override
     public void onReceive(Map<String, Object> head, String type, String msg, Consumer<ReceiveResult> onResult) {
         executorService.submit(() -> {
             try {

+ 1 - 0
iot-components/iot-component-tcp/src/main/java/cc/iotkit/comp/tcp/cilent/TcpClientDeviceComponent.java

@@ -20,6 +20,7 @@ public class TcpClientDeviceComponent extends AbstractDeviceComponent {
     private TcpClientVerticle tcpClientVerticle;
     private String deployedId;
 
+    @Override
     public void create(CompConfig config) {
         super.create(config);
         vertx = Vertx.vertx();

+ 43 - 18
iot-components/iot-emqx-component/src/main/java/cc/iotkit/comp/emqx/EmqxDeviceComponent.java

@@ -11,6 +11,7 @@ package cc.iotkit.comp.emqx;
 
 import cc.iotkit.common.exception.BizException;
 import cc.iotkit.common.utils.JsonUtil;
+import cc.iotkit.common.utils.ThreadUtil;
 import cc.iotkit.comp.AbstractDeviceComponent;
 import cc.iotkit.comp.CompConfig;
 import cc.iotkit.comp.IMessageHandler;
@@ -36,9 +37,11 @@ import java.lang.reflect.InvocationTargetException;
 import java.nio.charset.Charset;
 import java.util.*;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 
-public class EmqxDeviceComponent extends AbstractDeviceComponent {
+public class EmqxDeviceComponent extends AbstractDeviceComponent implements Runnable {
 
     private static final Logger log = LoggerFactory.getLogger(EmqxDeviceComponent.class);
     private Vertx vertx;
@@ -47,12 +50,17 @@ public class EmqxDeviceComponent extends AbstractDeviceComponent {
     private String deployedId;
     private EmqxConfig mqttConfig;
     private MqttClient client;
+    private boolean mqttConnected = false;
+    private final ScheduledThreadPoolExecutor emqxConnectTask = ThreadUtil.newScheduled(1, "emqx_connect");
 
-    //组件mqtt clientId,默认通过mqtt auth / acl验证。
+    /**
+     * 组件mqtt clientId,默认通过mqtt auth / acl验证。
+     */
     private final Set<String> compMqttClientIdList = new HashSet<>();
 
     private final TransparentConverter transparentConverter = new TransparentConverter();
 
+    @Override
     public void create(CompConfig config) {
         super.create(config);
         vertx = Vertx.vertx();
@@ -64,22 +72,33 @@ public class EmqxDeviceComponent extends AbstractDeviceComponent {
     public void start() {
         try {
             compMqttClientIdList.add(mqttConfig.getClientId());
-
             authVerticle.setExecutor(getHandler());
             countDownLatch = new CountDownLatch(1);
             Future<String> future = vertx.deployVerticle(authVerticle);
             future.onSuccess((s -> {
                 deployedId = s;
                 countDownLatch.countDown();
-                log.error("start emqx auth component success");
+                log.info("start emqx auth component success");
             }));
             future.onFailure((e) -> {
                 countDownLatch.countDown();
                 log.error("start emqx auth component failed", e);
             });
-
             countDownLatch.await();
 
+            emqxConnectTask.scheduleWithFixedDelay(this, 0, 3, TimeUnit.SECONDS);
+        } catch (Throwable e) {
+            throw new BizException("start emqx auth component error", e);
+        }
+    }
+
+    @Override
+    public void run() {
+        try {
+            if (mqttConnected) {
+                return;
+            }
+
             MqttClientOptions options = new MqttClientOptions()
                     .setClientId(mqttConfig.getClientId())
                     .setUsername(mqttConfig.getUsername())
@@ -101,14 +120,6 @@ public class EmqxDeviceComponent extends AbstractDeviceComponent {
                 subscribes.put(topic, 1);
             }
 
-            /*subscribes.put("/sys/+/+/s/#", 1);
-            subscribes.put("/sys/client/connected", 1);
-            subscribes.put("/sys/client/disconnected", 1);
-            subscribes.put("/sys/session/subscribed", 1);
-            subscribes.put("/sys/session/unsubscribed", 1);*/
-
-
-            // handler will be called when we have a message in topic we subscribe for
             client.publishHandler(p -> {
                 log.info("Client received message on [{}] payload [{}] with QoS [{}]", p.topicName(), p.payload().toString(Charset.defaultCharset()), p.qosLevel());
 
@@ -131,6 +142,15 @@ public class EmqxDeviceComponent extends AbstractDeviceComponent {
             client.connect(mqttConfig.getPort(), mqttConfig.getBroker(), s -> {
                 if (s.succeeded()) {
                     log.info("client connect success.");
+                    mqttConnected = true;
+                    /*
+                     * 订阅主题:
+                     * /sys/+/+/s/#
+                     * /sys/client/connected
+                     * /sys/client/disconnected
+                     * /sys/session/subscribed
+                     * /sys/session/unsubscribed
+                     */
                     client.subscribe(subscribes, e -> {
                         if (e.succeeded()) {
                             log.info("===>subscribe success: {}", e.result());
@@ -140,14 +160,13 @@ public class EmqxDeviceComponent extends AbstractDeviceComponent {
                     });
 
                 } else {
+                    mqttConnected = false;
                     log.error("client connect fail: ", s.cause());
                 }
-            }).exceptionHandler(event -> {
-                log.error("client fail: ", event.getCause());
-            });
+            }).exceptionHandler(event -> log.error("client fail", event));
 
         } catch (Throwable e) {
-            throw new BizException("start emqx auth component error", e);
+            throw new BizException("start emqx component error", e);
         }
     }
 
@@ -157,9 +176,15 @@ public class EmqxDeviceComponent extends AbstractDeviceComponent {
         authVerticle.stop();
         Future<Void> future = vertx.undeploy(deployedId);
         future.onSuccess(unused -> log.info("stop emqx auth component success"));
+
         client.disconnect()
-                .onSuccess(unused -> log.info("stop emqx component success"))
+                .onSuccess(unused -> {
+                    mqttConnected = false;
+                    log.info("stop emqx component success");
+                })
                 .onFailure(unused -> log.info("stop emqx component failure"));
+
+        emqxConnectTask.shutdown();
     }
 
     @Override

+ 1 - 0
iot-components/iot-mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttDeviceComponent.java

@@ -38,6 +38,7 @@ public class MqttDeviceComponent extends AbstractDeviceComponent {
     private final Map<String, Device> deviceChildToParent = new HashMap<>();
     private final TransparentConverter transparentConverter = new TransparentConverter();
 
+    @Override
     public void create(CompConfig config) {
         super.create(config);
         vertx = Vertx.vertx();

+ 5 - 2
iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/WebSocketDeviceComponent.java

@@ -23,13 +23,13 @@ public class WebSocketDeviceComponent extends AbstractDeviceComponent {
     private CountDownLatch countDownLatch;
     private String deployedId;
     private AbstractDeviceVerticle webSocketVerticle;
-    private String type;
     private final Map<String, Device> deviceChildToParent = new HashMap<>();
 
+    @Override
     public void create(CompConfig config) {
         super.create(config);
         vertx = Vertx.vertx();
-        type= JsonUtil.parse(config.getOther(), Map.class).get("type").toString();
+        String type = JsonUtil.parse(config.getOther(), Map.class).get("type").toString();
         if(AbstractDeviceVerticle.TYPE_CLIENT.equals(type)){
             webSocketVerticle = new WebSocketClientVerticle(config.getOther());
         }else{
@@ -37,6 +37,7 @@ public class WebSocketDeviceComponent extends AbstractDeviceComponent {
         }
     }
 
+    @Override
     public void start() {
         try {
             webSocketVerticle.setExecutor(getHandler());
@@ -57,6 +58,7 @@ public class WebSocketDeviceComponent extends AbstractDeviceComponent {
         }
     }
 
+    @Override
     @SneakyThrows
     public void stop() {
         webSocketVerticle.stop();
@@ -64,6 +66,7 @@ public class WebSocketDeviceComponent extends AbstractDeviceComponent {
         future.onSuccess(unused -> log.info("stop websocket component success"));
     }
 
+    @Override
     public void destroy() {
     }
 

+ 3 - 7
iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/server/WebSocketServerVerticle.java

@@ -6,8 +6,6 @@ import cc.iotkit.common.utils.JsonUtil;
 import cc.iotkit.comp.model.ReceiveResult;
 import cc.iotkit.comp.websocket.AbstractDeviceVerticle;
 import cc.iotkit.converter.DeviceMessage;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.util.JSONPObject;
 import io.vertx.core.Future;
 import io.vertx.core.http.HttpServer;
 import io.vertx.core.http.HttpServerOptions;
@@ -29,7 +27,7 @@ public class WebSocketServerVerticle extends AbstractDeviceVerticle {
 
     private HttpServer httpServer;
 
-    private WebSocketServerConfig webSocketConfig;
+    private final WebSocketServerConfig webSocketConfig;
 
     private final Map<String, ServerWebSocket> wsClients = new ConcurrentHashMap<>();
 
@@ -37,7 +35,7 @@ public class WebSocketServerVerticle extends AbstractDeviceVerticle {
         this.webSocketConfig = JsonUtil.parse(config, WebSocketServerConfig.class);
     }
 
-    private Map<String, String> tokens=new HashMap<>();
+    private final Map<String, String> tokens=new HashMap<>();
 
     @Override
     public void start() throws Exception {
@@ -77,17 +75,15 @@ public class WebSocketServerVerticle extends AbstractDeviceVerticle {
                     }
                     log.warn("认证失败,拒绝");
                     wsClient.writeTextMessage("auth fail");
-                    return;
                 }else{
                     log.warn("认证失败,拒绝");
                     wsClient.writeTextMessage("auth fail");
-                    return;
                 }
 
             });
             wsClient.closeHandler(c -> {
                 log.warn("client connection closed,deviceKey:{}", deviceKey);
-                executor.onReceive(new HashMap<>(), "disconnect", JsonUtil.toJsonString(deviceKeyObj), (r) -> {
+                executor.onReceive(new HashMap<>(0), "disconnect", JsonUtil.toJsonString(deviceKeyObj), (r) -> {
                     //删除设备与连接关系
                     if(r!=null){
                         wsClients.remove(getDeviceKey(r));

+ 1 - 0
iot-test-tool/iot-test-mqtt/src/main/java/cc/iotkit/test/mqtt/performance/ReportTest.java

@@ -34,6 +34,7 @@ public class ReportTest {
 
         if (args.length == 0) {
             Mqtt.brokerHost = "127.0.0.1";
+//            Mqtt.brokerPort = 2883;
 //            Mqtt.brokerHost = "120.76.96.206";
 //            Mqtt.brokerHost = "172.16.1.109";
         } else {