Prechádzať zdrojové kódy

功能新增
1、通讯协议中新增websocket协议

tangfudong 2 rokov pred
rodič
commit
883205a810

+ 7 - 0
iot-components/iot-component-server/src/main/java/cc/iotkit/comps/DeviceMessageHandler.java

@@ -138,6 +138,13 @@ public class DeviceMessageHandler implements IMessageHandler {
                     doAction(action);
                     onResult.accept(new ReceiveResult(message.getProductKey(), message.getDeviceName(), message));
                     return;
+                } else if ("action".equals(rstType)) {
+                    //纯做回复操作
+                    DeviceMessage message = new DeviceMessage();
+                    BeanUtils.populate(message, dataMap);
+                    doAction(action);
+                    onResult.accept(new ReceiveResult(message.getProductKey(), message.getDeviceName(), message));
+                    return;
                 }
 
             } catch (Throwable e) {

+ 89 - 0
iot-components/iot-websocket-component/pom.xml

@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>iot-components</artifactId>
+        <groupId>cc.iotkit</groupId>
+        <version>0.4.2-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>iot-websocket-component</artifactId>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>io.vertx</groupId>
+            <artifactId>vertx-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.luaj</groupId>
+            <artifactId>luaj-jse</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>cc.iotkit</groupId>
+            <artifactId>iot-common</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>cc.iotkit</groupId>
+            <artifactId>iot-component-base</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>cc.iotkit</groupId>
+            <artifactId>iot-data-service</artifactId>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.2.4</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <artifactSet>
+                        <includes>
+                            <include>io.vertx:vertx-core</include>
+                            <include>org.luaj:luaj-jse</include>
+                        </includes>
+                    </artifactSet>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>11</source>
+                    <target>11</target>
+                    <forceJavacCompilerUse>true</forceJavacCompilerUse>
+                    <useIncrementalCompilation>false</useIncrementalCompilation>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

+ 18 - 0
iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/AbstractDeviceVerticle.java

@@ -0,0 +1,18 @@
+package cc.iotkit.comp.websocket;
+
+import cc.iotkit.comp.IMessageHandler;
+import cc.iotkit.converter.DeviceMessage;
+import io.vertx.core.AbstractVerticle;
+import lombok.Data;
+
+@Data
+public abstract class AbstractDeviceVerticle extends AbstractVerticle {
+
+    public static final String TYPE_SERVER = "server";
+    public static final String TYPE_CLIENT = "client";
+
+    protected IMessageHandler executor;
+
+    public abstract DeviceMessage send(DeviceMessage message);
+
+}

+ 111 - 0
iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/WebSocketDeviceComponent.java

@@ -0,0 +1,111 @@
+package cc.iotkit.comp.websocket;
+
+import cc.iotkit.common.exception.BizException;
+import cc.iotkit.common.utils.JsonUtil;
+import cc.iotkit.comp.AbstractDeviceComponent;
+import cc.iotkit.comp.CompConfig;
+import cc.iotkit.comp.model.DeviceState;
+import cc.iotkit.comp.websocket.client.WebSocketClientVerticle;
+import cc.iotkit.comp.websocket.server.WebSocketServerVerticle;
+import cc.iotkit.converter.DeviceMessage;
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import lombok.*;
+import lombok.extern.slf4j.Slf4j;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+@Slf4j
+public class WebSocketDeviceComponent extends AbstractDeviceComponent {
+
+    private Vertx vertx;
+    private CountDownLatch countDownLatch;
+    private String deployedId;
+    private AbstractDeviceVerticle webSocketVerticle;
+    private String type;
+    private final Map<String, Device> deviceChildToParent = new HashMap<>();
+
+    public void create(CompConfig config) {
+        super.create(config);
+        vertx = Vertx.vertx();
+        type= JsonUtil.parse(config.getOther(), Map.class).get("type").toString();
+        if(AbstractDeviceVerticle.TYPE_CLIENT.equals(type)){
+            webSocketVerticle = new WebSocketClientVerticle(config.getOther());
+        }else{
+            webSocketVerticle = new WebSocketServerVerticle(config.getOther());
+        }
+    }
+
+    public void start() {
+        try {
+            webSocketVerticle.setExecutor(getHandler());
+            countDownLatch = new CountDownLatch(1);
+            Future<String> future = vertx.deployVerticle(webSocketVerticle);
+            future.onSuccess((s -> {
+                deployedId = s;
+                countDownLatch.countDown();
+            }));
+            future.onFailure((e) -> {
+                countDownLatch.countDown();
+                log.error("start websocket component failed", e);
+            });
+            countDownLatch.await();
+            future.succeeded();
+        } catch (Throwable e) {
+            throw new BizException("start websocket component error", e);
+        }
+    }
+
+    @SneakyThrows
+    public void stop() {
+        webSocketVerticle.stop();
+        Future<Void> future = vertx.undeploy(deployedId);
+        future.onSuccess(unused -> log.info("stop websocket component success"));
+    }
+
+    public void destroy() {
+    }
+
+    @Override
+    public void onDeviceStateChange(DeviceState state) {
+        DeviceState.Parent parent = state.getParent();
+        if (parent == null) {
+            return;
+        }
+        Device device = new Device(state.getProductKey(), state.getDeviceName());
+
+        if (DeviceState.STATE_ONLINE.equals(state.getState())) {
+            //保存子设备所属父设备
+            deviceChildToParent.put(device.toString(),
+                    new Device(parent.getProductKey(), parent.getDeviceName())
+            );
+        } else {
+            //删除关系
+            deviceChildToParent.remove(device.toString());
+        }
+
+    }
+
+    @Override
+    public DeviceMessage send(DeviceMessage message) {
+        webSocketVerticle.send(message);
+        return message;
+    }
+
+    @Override
+    public CompConfig getConfig() {
+        return config;
+    }
+
+
+    @Data
+    @NoArgsConstructor
+    @AllArgsConstructor
+    @ToString
+    public static class Device {
+        private String productKey;
+        private String deviceName;
+    }
+
+}

+ 20 - 0
iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/client/WebSocketClientConfig.java

@@ -0,0 +1,20 @@
+package cc.iotkit.comp.websocket.client;
+
+import lombok.Data;
+
+@Data
+public class WebSocketClientConfig {
+
+    private int port;
+
+    private String ip;
+
+    private String url;
+
+    private long heartBeatTime;
+
+    private String heartBeatData;
+
+    private boolean ssl;
+
+}

+ 124 - 0
iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/client/WebSocketClientVerticle.java

@@ -0,0 +1,124 @@
+package cc.iotkit.comp.websocket.client;
+
+import cc.iotkit.common.exception.BizException;
+import cc.iotkit.common.utils.JsonUtil;
+import cc.iotkit.comp.model.ReceiveResult;
+import cc.iotkit.comp.model.RegisterInfo;
+import cc.iotkit.comp.websocket.AbstractDeviceVerticle;
+import cc.iotkit.converter.DeviceMessage;
+import io.vertx.core.Future;
+import io.vertx.core.http.HttpClient;
+import io.vertx.core.http.HttpClientOptions;
+import io.vertx.core.http.WebSocket;
+import io.vertx.core.http.WebSocketConnectOptions;
+import lombok.*;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Slf4j
+public class WebSocketClientVerticle extends AbstractDeviceVerticle {
+
+    private HttpClient httpClient;
+
+    private WebSocket webSocketClient;
+
+    private WebSocketClientConfig webSocketConfig;
+
+    private long timerID;
+
+    private final Map<String, Device> devices = new ConcurrentHashMap<>();
+
+    public void setWebSocketClient(WebSocket webSocketClient) {
+        this.webSocketClient = webSocketClient;
+    }
+
+    public WebSocketClientVerticle(String config) {
+        this.webSocketConfig = JsonUtil.parse(config, WebSocketClientConfig.class);
+    }
+
+    public void start() {
+        WebSocketConnectOptions options = new WebSocketConnectOptions().setPort(webSocketConfig.getPort())
+                .setHost(webSocketConfig.getIp()).setURI(webSocketConfig.getUrl()).setSsl(webSocketConfig.isSsl());
+        httpClient = vertx.createHttpClient();
+        httpClient.webSocket(options).onSuccess(ws -> {
+            setWebSocketClient(ws);
+            log.info("webSocket client connect success!");
+            ws.textMessageHandler(data -> {
+                log.info("webSocket client receive msg:" + data);
+                executor.onReceive(new HashMap<>(), null, data, (ret) -> {
+                    if (ret != null && ret.getData() instanceof RegisterInfo) {
+                        executor.onReceive(null, "connected", data, (r) -> {
+                            if (!devices.containsKey(getDeviceKey(r))) {
+                                devices.put(getDeviceKey(r), new Device(r.getDeviceName(), r.getProductKey()));
+                            }
+                        });
+                    }
+                });
+            });
+            ws.closeHandler(e -> {
+                for (String deviceKey : devices.keySet()) {
+                    executor.onReceive(null, "disconnect", deviceKey);
+                }
+                log.warn("client connection closed!");
+            });
+            ws.exceptionHandler(e -> {
+                for (String deviceKey : devices.keySet()) {
+                    executor.onReceive(null, "disconnect", deviceKey);
+                }
+                log.error("webSocket client connect exception!");
+            });
+            if (webSocketConfig.getHeartBeatTime() > 0 && StringUtils.isNotBlank(webSocketConfig.getHeartBeatData())) {
+                timerID = vertx.setPeriodic(webSocketConfig.getHeartBeatTime(), t -> {
+                    if (webSocketClient.isClosed()) {
+                        vertx.cancelTimer(timerID);
+                    }
+                    executor.onReceive(new HashMap<>(), "ping", JsonUtil.toJsonString(webSocketConfig));
+                });
+            }
+        }).onFailure(e -> {
+            log.info("webSocket client connect failed!");
+        });
+    }
+
+    @SneakyThrows
+    public void stop() {
+        vertx.cancelTimer(timerID);
+        for (String deviceKey : devices.keySet()) {
+            executor.onReceive(null, "disconnect", deviceKey);
+        }
+        httpClient.close();
+    }
+
+    @Override
+    public DeviceMessage send(DeviceMessage message) {
+        Object obj = message.getContent();
+        if (!(obj instanceof Map)) {
+            throw new BizException("message content is not Map");
+        }
+        String msgStr = JsonUtil.toJsonString(obj);
+        log.info("send msg payload:{}", msgStr);
+        Future<Void> result = webSocketClient.writeTextMessage(msgStr);
+        result.onFailure(e -> log.error("webSocket client send msg failed", e));
+        return message;
+    }
+
+    private String getDeviceKey(ReceiveResult result) {
+        return getDeviceKey(result.getProductKey(), result.getDeviceName());
+    }
+
+    private String getDeviceKey(String productKey, String deviceName) {
+        return String.format("%s_%s", productKey, deviceName);
+    }
+
+    @Data
+    @NoArgsConstructor
+    @AllArgsConstructor
+    @ToString
+    public static class Device {
+        private String productKey;
+        private String deviceName;
+    }
+}

+ 16 - 0
iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/server/WebSocketServerConfig.java

@@ -0,0 +1,16 @@
+package cc.iotkit.comp.websocket.server;
+
+import lombok.Data;
+
+@Data
+public class WebSocketServerConfig {
+
+    private int port;
+
+    private String sslKey;
+
+    private String sslCert;
+
+    private boolean ssl;
+
+}

+ 111 - 0
iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/server/WebSocketServerVerticle.java

@@ -0,0 +1,111 @@
+package cc.iotkit.comp.websocket.server;
+
+
+import cc.iotkit.common.exception.BizException;
+import cc.iotkit.common.utils.JsonUtil;
+import cc.iotkit.comp.model.ReceiveResult;
+import cc.iotkit.comp.websocket.AbstractDeviceVerticle;
+import cc.iotkit.converter.DeviceMessage;
+import io.vertx.core.Future;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.HttpServerOptions;
+import io.vertx.core.http.ServerWebSocket;
+import io.vertx.core.net.PemKeyCertOptions;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+@Slf4j
+public class WebSocketServerVerticle extends AbstractDeviceVerticle {
+
+
+    private HttpServer httpServer;
+
+    private WebSocketServerConfig webSocketConfig;
+
+    private final Map<String, ServerWebSocket> wsClients = new ConcurrentHashMap<>();
+
+    public WebSocketServerVerticle(String config) {
+        this.webSocketConfig = JsonUtil.parse(config, WebSocketServerConfig.class);
+    }
+
+    @Override
+    public void start() throws Exception {
+        HttpServerOptions options = new HttpServerOptions()
+                .setPort(webSocketConfig.getPort());
+        if (webSocketConfig.isSsl()) {
+            options = options.setSsl(true)
+                    .setKeyCertOptions(new PemKeyCertOptions()
+                            .setKeyPath(webSocketConfig.getSslKey())
+                            .setCertPath(webSocketConfig.getSslCert()));
+        }
+        httpServer = vertx.createHttpServer(options).webSocketHandler(wsClient -> {
+            log.info("webSocket client connect sessionId:{},path={}", wsClient.textHandlerID(), wsClient.path());
+            String deviceKey = wsClient.headers().get("deviceKey");
+            wsClient.textMessageHandler(message -> {
+                executor.onReceive(new HashMap<>(), "auth", deviceKey, (r) -> {
+                    if (r == null) {
+                        //认证失败
+                        wsClient.reject();
+                        return;
+                    }
+                    //保存设备与连接关系
+                    wsClients.put(getDeviceKey(r), wsClient);
+                    executor.onReceive(new HashMap<>(), "", message);
+                });
+            });
+            wsClient.closeHandler(c -> {
+                log.warn("client connection closed,deviceKey:{}", deviceKey);
+                executor.onReceive(new HashMap<>(), "disconnect", deviceKey, (r) -> {
+                    //删除设备与连接关系
+                    wsClients.remove(getDeviceKey(r));
+                });
+            });
+            wsClient.endHandler(e -> {
+                log.warn("webSocket client connection end,deviceKey:{}", deviceKey);
+            });
+            wsClient.exceptionHandler(ex -> {
+                log.warn("webSocket client connection exception,deviceKey:{}", deviceKey);
+            });
+        }).listen(webSocketConfig.getPort(), server -> {
+            if (server.succeeded()) {
+                log.info("webSocket server is listening on port " + webSocketConfig.getPort());
+            } else {
+                log.error("webSocket server on starting the server", server.cause());
+            }
+        });
+    }
+
+    @Override
+    public void stop() throws Exception {
+        for (String deviceKey : wsClients.keySet()) {
+            executor.onReceive(null, "disconnect", deviceKey);
+        }
+        httpServer.close(voidAsyncResult -> log.info("close webocket server..."));
+    }
+
+    private String getDeviceKey(ReceiveResult result) {
+        return getDeviceKey(result.getProductKey(), result.getDeviceName());
+    }
+
+    private String getDeviceKey(String productKey, String deviceName) {
+        return String.format("%s_%s", productKey, deviceName);
+    }
+
+    @Override
+    public DeviceMessage send(DeviceMessage message) {
+        ServerWebSocket wsClient = wsClients.get(getDeviceKey(message.getProductKey(), message.getDeviceName()));
+        Object obj = message.getContent();
+        if (!(obj instanceof Map)) {
+            throw new BizException("message content is not Map");
+        }
+        String msgStr = JsonUtil.toJsonString(obj);
+        log.info("send msg payload:{}", msgStr);
+        Future<Void> result = wsClient.writeTextMessage(msgStr);
+        result.onFailure(e -> log.error("webSocket server send msg failed", e));
+        return message;
+    }
+}

+ 72 - 0
iot-components/iot-websocket-component/src/main/resources/component.js

@@ -0,0 +1,72 @@
+var mid=1;
+
+var access_token="";
+
+function getMid(){
+	mid++;
+	if(mid>10000){
+		mid=1;
+	}
+	return mid;
+};
+function getPingData(data){
+	var ping={
+		productKey:"",
+		deviceName:"",
+		content:{
+			id:getMid(),
+			type:data
+		}
+	};
+	return {
+		type:"action",
+		data:{
+			productKey:"",
+			deviceName:"",
+			state:""
+		},
+		action:{
+			type:"ack",
+			content:JSON.stringify(ping)
+		}
+	}
+};
+//必须提供onReceive方法
+this.onReceive=function(head,type,payload){
+	var data=JSON.parse(payload)
+	if(data.type=="auth_required"){
+		var auth={
+			productKey:"",
+			deviceName:"",
+			content:{
+				type:"auth",
+				access_token:access_token
+			}
+		};
+		return {
+			type:"action",
+			data:{
+				productKey:"",
+				deviceName:"",
+				state:""
+			},
+			action:{
+				type:"ack",
+				content:JSON.stringify(auth)
+			}
+		}
+	}else if(data.type=="auth_ok"){
+		return getPingData(data.heartBeatData);
+	}else if(data.type=="pong"){
+		apiTool.log("receive pong!");
+	}else if("ping"==type){
+		return getPingData(data.heartBeatData);
+	}
+	return {
+		productKey:"",
+		deviceName:"",
+		mid:0,
+		content:{
+		}
+	}
+};

+ 1 - 0
iot-components/iot-websocket-component/src/main/resources/component.spi

@@ -0,0 +1 @@
+cc.iotkit.comp.websocket.WebSocketDeviceComponent

+ 55 - 0
iot-components/iot-websocket-component/src/main/resources/converter.js

@@ -0,0 +1,55 @@
+
+var mid=1;
+
+function getMid(){
+	mid++;
+	if(mid>10000){
+		mid=1;
+	}
+	return mid+"";
+}
+
+this.decode = function (msg) {
+	//对msg进行解析,并返回物模型数据
+	var content=msg.content;
+	var type = content.type;
+
+	if (type=="report") {
+		//属性上报
+		return {
+			mid: msg.mid,
+			productKey: msg.productKey,
+			deviceName: msg.deviceName,
+			type:"property",
+			identifier: "report", //属性上报
+			occur: new Date().getTime(), //时间戳,设备上的事件或数据产生的本地时间
+			time: new Date().getTime(), //时间戳,消息上报时间
+			data: content.params,
+		};
+	}
+	return null;
+};
+
+this.encode = function (service,device) {
+	var type=service.type;
+	var identifier=service.identifier;
+	var entityId=service.deviceName;
+	var deviceMid=getMid();
+	var params={};
+	var target={};
+	if("property"==type&&"set"==identifier){
+		var domain=entityId.split(".")[0];
+		var powerstate=service.params.powerstate==1?"turn_on":"turn_off";
+		params.type="call_service";
+		params.domain=domain;
+		params.service=powerstate;
+		target.entity_id=entityId;
+		params.target=target;
+	}
+	return {
+		productKey:service.productKey,
+		deviceName:service.deviceName,
+		mid:deviceMid,
+		content:params
+	}
+};