|
@@ -6,6 +6,8 @@ import cc.iotkit.common.utils.JsonUtil;
|
|
|
import cc.iotkit.comp.model.ReceiveResult;
|
|
|
import cc.iotkit.comp.websocket.AbstractDeviceVerticle;
|
|
|
import cc.iotkit.converter.DeviceMessage;
|
|
|
+import com.fasterxml.jackson.databind.JsonNode;
|
|
|
+import com.fasterxml.jackson.databind.util.JSONPObject;
|
|
|
import io.vertx.core.Future;
|
|
|
import io.vertx.core.http.HttpServer;
|
|
|
import io.vertx.core.http.HttpServerOptions;
|
|
@@ -15,7 +17,9 @@ import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
|
|
@@ -33,6 +37,8 @@ public class WebSocketServerVerticle extends AbstractDeviceVerticle {
|
|
|
this.webSocketConfig = JsonUtil.parse(config, WebSocketServerConfig.class);
|
|
|
}
|
|
|
|
|
|
+ private Map<String, String> tokens=new HashMap<>();
|
|
|
+
|
|
|
@Override
|
|
|
public void start() throws Exception {
|
|
|
HttpServerOptions options = new HttpServerOptions()
|
|
@@ -46,31 +52,46 @@ public class WebSocketServerVerticle extends AbstractDeviceVerticle {
|
|
|
httpServer = vertx.createHttpServer(options).webSocketHandler(wsClient -> {
|
|
|
log.info("webSocket client connect sessionId:{},path={}", wsClient.textHandlerID(), wsClient.path());
|
|
|
String deviceKey = wsClient.path().replace("/","");
|
|
|
- if(StringUtils.isBlank(deviceKey)||deviceKey.split("_").length<2){
|
|
|
- wsClient.reject();
|
|
|
+ if(StringUtils.isBlank(deviceKey)||deviceKey.split("_").length!=2){
|
|
|
log.warn("陌生连接,拒绝");
|
|
|
+ wsClient.reject();
|
|
|
return;
|
|
|
}
|
|
|
+ wsClient.writeTextMessage("connect succes! please auth!");
|
|
|
Map<String,String> deviceKeyObj=new HashMap<>();
|
|
|
deviceKeyObj.put("deviceKey",deviceKey);
|
|
|
- executor.onReceive(new HashMap<>(), "auth", JsonUtil.toJsonString(deviceKeyObj), (r) -> {
|
|
|
- if (r == null) {
|
|
|
- //认证失败
|
|
|
+ wsClient.textMessageHandler(message -> {
|
|
|
+ HashMap<String,String> msg= JsonUtil.parse(message,HashMap.class);
|
|
|
+ if(wsClients.containsKey(deviceKey)){
|
|
|
+ executor.onReceive(new HashMap<>(), "", message);
|
|
|
+ }else if(msg!=null&&"auth".equals(msg.get("type"))){
|
|
|
+ Set<String> tokenKey=tokens.keySet();
|
|
|
+ for(String key:tokenKey){
|
|
|
+ if(StringUtils.isNotBlank(msg.get(key))&&tokens.get(key).equals(msg.get(key))){
|
|
|
+ //保存设备与连接关系
|
|
|
+ log.info("认证通过");
|
|
|
+ wsClients.put(deviceKey, wsClient);
|
|
|
+ wsClient.writeTextMessage("auth succes");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.warn("认证失败,拒绝");
|
|
|
+ wsClient.writeTextMessage("auth fail");
|
|
|
+ return;
|
|
|
+ }else{
|
|
|
log.warn("认证失败,拒绝");
|
|
|
- wsClient.reject();
|
|
|
+ wsClient.writeTextMessage("auth fail");
|
|
|
return;
|
|
|
}
|
|
|
- //保存设备与连接关系
|
|
|
- wsClients.put(getDeviceKey(r), wsClient);
|
|
|
- });
|
|
|
- wsClient.textMessageHandler(message -> {
|
|
|
- executor.onReceive(new HashMap<>(), "", message);
|
|
|
+
|
|
|
});
|
|
|
wsClient.closeHandler(c -> {
|
|
|
log.warn("client connection closed,deviceKey:{}", deviceKey);
|
|
|
executor.onReceive(new HashMap<>(), "disconnect", JsonUtil.toJsonString(deviceKeyObj), (r) -> {
|
|
|
//删除设备与连接关系
|
|
|
- wsClients.remove(getDeviceKey(r));
|
|
|
+ if(r!=null){
|
|
|
+ wsClients.remove(getDeviceKey(r));
|
|
|
+ }
|
|
|
});
|
|
|
});
|
|
|
wsClient.exceptionHandler(ex -> {
|
|
@@ -79,6 +100,10 @@ public class WebSocketServerVerticle extends AbstractDeviceVerticle {
|
|
|
}).listen(webSocketConfig.getPort(), server -> {
|
|
|
if (server.succeeded()) {
|
|
|
log.info("webSocket server is listening on port " + webSocketConfig.getPort());
|
|
|
+ List<WebSocketServerConfig.AccessToken> tokenConfig= webSocketConfig.getAccessTokens();
|
|
|
+ for (WebSocketServerConfig.AccessToken obj:tokenConfig) {
|
|
|
+ tokens.put(obj.getTokenName(),obj.getTokenStr());
|
|
|
+ }
|
|
|
} else {
|
|
|
log.error("webSocket server on starting the server", server.cause());
|
|
|
}
|
|
@@ -92,6 +117,7 @@ public class WebSocketServerVerticle extends AbstractDeviceVerticle {
|
|
|
deviceKeyObj.put("deviceKey",deviceKey);
|
|
|
executor.onReceive(null, "disconnect", JsonUtil.toJsonString(deviceKeyObj));
|
|
|
}
|
|
|
+ tokens.clear();
|
|
|
httpServer.close(voidAsyncResult -> log.info("close webocket server..."));
|
|
|
}
|
|
|
|