瀏覽代碼

mqtt通讯组件修改

xiwa 3 年之前
父節點
當前提交
f658e344dd

+ 5 - 0
manager/pom.xml

@@ -159,6 +159,11 @@
             <artifactId>mqtt-component</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>cc.iotkit</groupId>
+            <artifactId>converter</artifactId>
+        </dependency>
+
     </dependencies>
 
     <build>

+ 1 - 1
manager/src/main/java/cc/iotkit/manager/config/KeycloakSecurityConfig.java

@@ -54,7 +54,7 @@ public class KeycloakSecurityConfig extends KeycloakWebSecurityConfigurerAdapter
         http
                 .authorizeRequests()
                 .antMatchers("/*.html", "/favicon.ico", "/v2/api-docs", "/webjars/**", "/swagger-resources/**", "/*.js").permitAll()
-                .antMatchers("/device_behaviour/**").permitAll()
+                .antMatchers("/protocol/**").permitAll()//todo for test
                 .antMatchers("/**/save*").hasRole("iot_write")
                 .antMatchers("/**/del*").hasRole("iot_write")
                 .antMatchers("/**/add*").hasRole("iot_write")

+ 14 - 4
manager/src/main/java/cc/iotkit/manager/controller/ProtocolController.java

@@ -14,6 +14,7 @@ import cc.iotkit.model.UserInfo;
 import cc.iotkit.model.protocol.ProtocolGateway;
 import cc.iotkit.protocol.server.service.GatewayService;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.domain.Page;
@@ -22,6 +23,8 @@ import org.springframework.data.domain.Sort;
 import org.springframework.web.bind.annotation.*;
 
 import javax.annotation.PostConstruct;
+import java.io.File;
+import java.io.IOException;
 import java.util.Optional;
 
 @Slf4j
@@ -130,13 +133,20 @@ public class ProtocolController {
         return new Paging<>(gateways.getTotalElements(), gateways.getContent());
     }
 
-    @PostConstruct
-    public void init() {
+    @GetMapping("/registerMqtt")
+    public void registerMqtt() throws IOException {
         MqttComponent component = new MqttComponent();
+        component.create("{\"port\":2883,\"ssl\":false}");
         ScriptConverter converter = new ScriptConverter();
-        converter.setScript("");
+        converter.setScript(FileUtils.readFileToString(new File("/Users/sjg/home/gitee/open-source/converter.js"), "UTF-8"));
         component.setConverter(converter);
         componentManager.register("123", component);
-        componentManager.start("123", "");
+        componentManager.start("123", FileUtils.readFileToString(new File("/Users/sjg/home/gitee/open-source/component.js"), "UTF-8"));
+    }
+
+    @GetMapping("/deregisterMqtt")
+    public void deregisterMqtt() {
+        componentManager.stop("123");
+        componentManager.deRegister("123");
     }
 }

+ 12 - 11
protocol-gateway/.DS_Store

@@ -1,5 +1,6 @@
 package cc.iotkit.comps;
 
+import cc.iotkit.common.exception.BizException;
 import cc.iotkit.comp.IMessageHandler;
 import cc.iotkit.comp.model.DeviceMessage;
 import cc.iotkit.comps.model.AuthInfo;
@@ -23,7 +24,7 @@ import java.util.Map;
 public class MessageHandler implements IMessageHandler {
     private final NashornScriptEngine engine = (NashornScriptEngine) (new ScriptEngineManager()).getEngineByName("nashorn");
 
-    private final String script;
+    private final Object scriptObj;
 
     private final IConverter converter;
 
@@ -32,10 +33,9 @@ public class MessageHandler implements IMessageHandler {
     @SneakyThrows
     public MessageHandler(String script, IConverter converter,
                           DeviceBehaviourService deviceBehaviourService) {
-        this.script = script;
         this.converter = converter;
         this.deviceBehaviourService = deviceBehaviourService;
-        engine.eval(script);
+        scriptObj = engine.eval(script);
     }
 
     public void register(Map<String, Object> head, String msg) {
@@ -49,7 +49,7 @@ public class MessageHandler implements IMessageHandler {
 
     public void onReceive(Map<String, Object> head, String type, String msg) {
         try {
-            ScriptObjectMirror result = (ScriptObjectMirror) engine.invokeFunction("onReceive", head, type, msg);
+            ScriptObjectMirror result = (ScriptObjectMirror) engine.invokeMethod(scriptObj, "onReceive", head, type, msg);
             Object rstType = result.get("type");
             if (rstType == null) {
                 return;
@@ -57,7 +57,7 @@ public class MessageHandler implements IMessageHandler {
             //取脚本执行后返回的数据
             Object data = result.get("data");
             if (!(data instanceof Map)) {
-                return;
+                throw new BizException("script result data is incorrect");
             }
             Map<String, Object> dataMap = (Map) data;
 
@@ -83,18 +83,20 @@ public class MessageHandler implements IMessageHandler {
                 doReport(message);
             }
 
+        } catch (BizException e) {
+            throw e;
         } catch (Throwable e) {
-            log.error("onReceive error", e);
+            throw new BizException("receive component message error", e);
         }
     }
 
     private void doRegister(RegisterInfo reg) throws ScriptException, NoSuchMethodException {
         try {
             deviceBehaviourService.register(reg);
-            engine.invokeFunction("onRegistered", reg, true);
+            engine.invokeMethod(scriptObj, "onRegistered", reg, "true");
         } catch (Throwable e) {
             log.error("register error", e);
-            engine.invokeFunction("onRegistered", reg, false);
+            engine.invokeMethod(scriptObj, "onRegistered", reg, "false");
         }
     }
 
@@ -104,10 +106,10 @@ public class MessageHandler implements IMessageHandler {
                     auth.getDeviceName(),
                     auth.getProductSecret(),
                     auth.getDeviceSecret());
-            engine.invokeFunction("onAuthed", auth, true);
+            engine.invokeMethod(scriptObj, "onAuthed", auth, true);
         } catch (Throwable e) {
             log.error("device auth error", e);
-            engine.invokeFunction("onAuthed", auth, false);
+            engine.invokeMethod(scriptObj, "onAuthed", auth, false);
         }
     }
 
@@ -128,5 +130,4 @@ public class MessageHandler implements IMessageHandler {
             log.error("report device message error", e);
         }
     }
-
 }

+ 3 - 2
protocol-gateway/component-server/src/main/java/cc/iotkit/comps/model/AuthInfo.java

@@ -104,10 +104,11 @@ public class DeviceBehaviourService {
         } else {
             //不存在,注册新设备
             device = new DeviceInfo();
+            device.setId(newDeviceId(info.getDeviceName()));
             device.setParentId(parentId);
             device.setUid(uid);
-            device.setDeviceId(newDeviceId(info.getDeviceName()));
-            device.setProductKey(info.getProductKey());
+            device.setDeviceId(device.getId());
+            device.setProductKey(pk);
             device.setDeviceName(info.getDeviceName());
             device.setTag(info.getTag());
             device.setState(new DeviceInfo.State(false, null, null));

+ 11 - 5
protocol-gateway/component/pom.xml

@@ -3,8 +3,10 @@ package cc.iotkit.comp.mqtt;
 import cc.iotkit.common.exception.BizException;
 import cc.iotkit.common.utils.JsonUtil;
 import cc.iotkit.comp.AbstractComponent;
+import cc.iotkit.comp.IMessageHandler;
 import io.vertx.core.Future;
 import io.vertx.core.Vertx;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.concurrent.CountDownLatch;
@@ -13,18 +15,21 @@ import java.util.concurrent.CountDownLatch;
 public class MqttComponent extends AbstractComponent {
 
     private Vertx vertx;
-    private final CountDownLatch countDownLatch = new CountDownLatch(1);
+    private CountDownLatch countDownLatch;
     private String deployedId;
-    private MqttConfig mqttConfig;
+    private MqttVerticle mqttVerticle;
 
     public void create(String config) {
         vertx = Vertx.vertx();
-        mqttConfig = JsonUtil.parse(config, MqttConfig.class);
+        MqttConfig mqttConfig = JsonUtil.parse(config, MqttConfig.class);
+        mqttVerticle = new MqttVerticle(mqttConfig);
     }
 
     public void start() {
         try {
-            Future<String> future = vertx.deployVerticle(new MqttVerticle(mqttConfig, getHandler()));
+            mqttVerticle.setExecutor(getHandler());
+            countDownLatch = new CountDownLatch(1);
+            Future<String> future = vertx.deployVerticle(mqttVerticle);
             future.onSuccess((s -> {
                 deployedId = s;
                 countDownLatch.countDown();
@@ -40,13 +45,14 @@ public class MqttComponent extends AbstractComponent {
         }
     }
 
+    @SneakyThrows
     public void stop() {
+        mqttVerticle.stop();
         Future<Void> future = vertx.undeploy(deployedId);
         future.onSuccess(unused -> log.info("stop mqtt component success"));
     }
 
     public void destroy() {
-        vertx.close();
     }
 
 }

+ 6 - 2
protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttVerticle.java

@@ -26,10 +26,13 @@ public class MqttVerticle extends AbstractVerticle {
 
     private final MqttConfig config;
 
-    private final IMessageHandler executor;
+    private IMessageHandler executor;
 
-    public MqttVerticle(MqttConfig config, IMessageHandler executor) {
+    public MqttVerticle(MqttConfig config) {
         this.config = config;
+    }
+
+    public void setExecutor(IMessageHandler executor) {
         this.executor = executor;
     }
 
@@ -62,6 +65,7 @@ public class MqttVerticle extends AbstractVerticle {
             } catch (Throwable e) {
                 log.error("auth failed", e);
                 endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
+                return;
             }
 
             log.info("MQTT client keep alive timeout = {} ", endpoint.keepAliveTimeSeconds());

文件差異過大導致無法顯示
+ 1 - 0
protocol-gateway/mqtt-component/src/main/resources/component.js


+ 51 - 0
protocol-gateway/mqtt-component/src/main/resources/converter.js

@@ -0,0 +1,51 @@
+new (function () {
+    this.decode = function (msg) {
+      //对msg进行解析,并返回物模型数据
+      var mqttMsg = JSON.parse(msg.content);
+      var topic = mqttMsg.topic;
+      var payload = mqttMsg.payload;
+  
+      if (topic.endsWith("/property/post")) {
+        //属性上报
+        return {
+          mid: msg.mid,
+          productKey: msg.productKey, //可根据消息内容判断填写不同产品
+          deviceName: msg.deviceName,
+          type:"property",
+          identifier: "report", //属性上报
+          occur: new Date().getTime(), //时间戳,设备上的事件或数据产生的本地时间
+          time: new Date().getTime(), //时间戳,消息上报时间
+          data: payload.params,
+        };
+      } else if (topic.indexOf("/event/") > 0) {
+        var identifier = topic.substring(topic.lastIndexOf("/") + 1);
+        //事件上报
+        return {
+          mid: msg.mid,
+          productKey: msg.productKey,
+          deviceName: msg.deviceName,
+          type:"event",
+          identifier: identifier,
+          occur: new Date().getTime(),
+          time: new Date().getTime(),
+          data: payload.params,
+        };
+      } else if (topic.endsWith("_reply")) {
+        var identifier = topic.substring(topic.lastIndexOf("/") + 1);
+        //服务回复
+        return {
+          mid: msg.mid,
+          productKey: msg.productKey,
+          deviceName: msg.deviceName,
+          type:"service",
+          identifier: identifier.replace("_reply", "Reply"),
+          occur: new Date().getTime(),
+          time: new Date().getTime(),
+          code: payload.code,
+          data: payload.data,
+        };
+      }
+      return null;
+    };
+  })()
+  

部分文件因文件數量過多而無法顯示