|
@@ -7,6 +7,7 @@ import cc.iotkit.comp.AbstractDeviceComponent;
|
|
import cc.iotkit.comp.CompConfig;
|
|
import cc.iotkit.comp.CompConfig;
|
|
import cc.iotkit.comp.IMessageHandler;
|
|
import cc.iotkit.comp.IMessageHandler;
|
|
import cc.iotkit.comp.model.DeviceState;
|
|
import cc.iotkit.comp.model.DeviceState;
|
|
|
|
+import cc.iotkit.comp.utils.SpringUtils;
|
|
import cc.iotkit.converter.DeviceMessage;
|
|
import cc.iotkit.converter.DeviceMessage;
|
|
import cc.iotkit.converter.ThingService;
|
|
import cc.iotkit.converter.ThingService;
|
|
import cc.iotkit.model.device.message.ThingModelMessage;
|
|
import cc.iotkit.model.device.message.ThingModelMessage;
|
|
@@ -19,15 +20,14 @@ import io.vertx.mqtt.MqttClient;
|
|
import io.vertx.mqtt.MqttClientOptions;
|
|
import io.vertx.mqtt.MqttClientOptions;
|
|
import lombok.*;
|
|
import lombok.*;
|
|
import org.apache.commons.beanutils.BeanUtils;
|
|
import org.apache.commons.beanutils.BeanUtils;
|
|
|
|
+import org.apache.commons.collections.CollectionUtils;
|
|
import org.apache.commons.lang3.RandomStringUtils;
|
|
import org.apache.commons.lang3.RandomStringUtils;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import java.lang.reflect.InvocationTargetException;
|
|
import java.lang.reflect.InvocationTargetException;
|
|
import java.nio.charset.Charset;
|
|
import java.nio.charset.Charset;
|
|
-import java.util.HashMap;
|
|
|
|
-import java.util.List;
|
|
|
|
-import java.util.Map;
|
|
|
|
|
|
+import java.util.*;
|
|
import java.util.concurrent.CountDownLatch;
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
|
|
|
|
|
|
|
@@ -42,6 +42,10 @@ public class EmqxDeviceComponent extends AbstractDeviceComponent {
|
|
private EmqxConfig mqttConfig;
|
|
private EmqxConfig mqttConfig;
|
|
MqttClient client;
|
|
MqttClient client;
|
|
|
|
|
|
|
|
+ //组件mqtt clientId,默认通过mqtt auth验证。
|
|
|
|
+ private Set<String> compMqttClientIdList = new HashSet<>();
|
|
|
|
+
|
|
|
|
+
|
|
private final Map<String, Device> deviceChildToParent = new HashMap<>();
|
|
private final Map<String, Device> deviceChildToParent = new HashMap<>();
|
|
|
|
|
|
private TransparentConverter transparentConverter = new TransparentConverter();
|
|
private TransparentConverter transparentConverter = new TransparentConverter();
|
|
@@ -56,6 +60,8 @@ public class EmqxDeviceComponent extends AbstractDeviceComponent {
|
|
@Override
|
|
@Override
|
|
public void start() {
|
|
public void start() {
|
|
try {
|
|
try {
|
|
|
|
+ compMqttClientIdList.add(mqttConfig.getClientId());
|
|
|
|
+
|
|
authVerticle.setExecutor(getHandler());
|
|
authVerticle.setExecutor(getHandler());
|
|
countDownLatch = new CountDownLatch(1);
|
|
countDownLatch = new CountDownLatch(1);
|
|
Future<String> future = vertx.deployVerticle(authVerticle);
|
|
Future<String> future = vertx.deployVerticle(authVerticle);
|
|
@@ -78,19 +84,18 @@ public class EmqxDeviceComponent extends AbstractDeviceComponent {
|
|
.setCleanSession(true)
|
|
.setCleanSession(true)
|
|
.setKeepAliveInterval(60);
|
|
.setKeepAliveInterval(60);
|
|
|
|
|
|
|
|
+
|
|
if (mqttConfig.isSsl()) {
|
|
if (mqttConfig.isSsl()) {
|
|
options.setSsl(true)
|
|
options.setSsl(true)
|
|
.setTrustAll(true);
|
|
.setTrustAll(true);
|
|
}
|
|
}
|
|
client = MqttClient.create(vertx, options);
|
|
client = MqttClient.create(vertx, options);
|
|
|
|
|
|
-
|
|
|
|
// handler will be called when we have a message in topic we subscribe for
|
|
// handler will be called when we have a message in topic we subscribe for
|
|
/*client.publishHandler(p -> {
|
|
/*client.publishHandler(p -> {
|
|
log.info("Client received message on [{}] payload [{}] with QoS [{}]", p.topicName(), p.payload().toString(Charset.defaultCharset()), p.qosLevel());
|
|
log.info("Client received message on [{}] payload [{}] with QoS [{}]", p.topicName(), p.payload().toString(Charset.defaultCharset()), p.qosLevel());
|
|
});*/
|
|
});*/
|
|
|
|
|
|
-
|
|
|
|
List<String> topics = mqttConfig.getSubscribeTopics();
|
|
List<String> topics = mqttConfig.getSubscribeTopics();
|
|
Map<String, Integer> subscribes = new HashMap<>();
|
|
Map<String, Integer> subscribes = new HashMap<>();
|
|
|
|
|
|
@@ -115,77 +120,14 @@ public class EmqxDeviceComponent extends AbstractDeviceComponent {
|
|
|
|
|
|
try {
|
|
try {
|
|
IMessageHandler messageHandler = getHandler();
|
|
IMessageHandler messageHandler = getHandler();
|
|
-
|
|
|
|
if (messageHandler != null) {
|
|
if (messageHandler != null) {
|
|
Map<String, Object> head = new HashMap<>();
|
|
Map<String, Object> head = new HashMap<>();
|
|
head.put("topic", topic);
|
|
head.put("topic", topic);
|
|
- if (topic.equals("/sys/client/connected")) {
|
|
|
|
- JsonNode payloadJson = JsonUtil.parse(payload);
|
|
|
|
- String clientId = payloadJson.get("clientid").textValue();
|
|
|
|
- log.warn("client connection connected,clientId:{}", clientId);
|
|
|
|
- head.put("clientId", clientId);
|
|
|
|
- messageHandler.onReceive(head, "connect", payload);
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- //连接断开
|
|
|
|
- if (topic.equals("/sys/client/disconnected")) {
|
|
|
|
- JsonNode payloadJson = JsonUtil.parse(payload);
|
|
|
|
- String clientId = payloadJson.get("clientid").textValue();
|
|
|
|
- log.warn("client connection closed,clientId:{}", clientId);
|
|
|
|
- head.put("clientId", clientId);
|
|
|
|
- messageHandler.onReceive(head, "disconnect", payload);
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- ** 子设备在线离线状态(topic: ^/sys/.+/.+/c/#$)**: 改为从 从 acl 访问控制 获取离线在线状态。
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- if (topic.equals("/sys/session/subscribed")) {
|
|
|
|
- JsonNode payloadJson = JsonUtil.parse(payload);
|
|
|
|
- String _topic = payloadJson.get("topic").textValue();
|
|
|
|
-
|
|
|
|
- //在线
|
|
|
|
- if (_topic.matches(Constants.MQTT.DEVICE_SUBSCRIBE_TOPIC)) {
|
|
|
|
- //head.put("topic", _topic);
|
|
|
|
- String clientId = payloadJson.get("clientid").textValue();
|
|
|
|
- log.warn("session subscribe, topic:{}", _topic);
|
|
|
|
- head.put("clientId", clientId);
|
|
|
|
- messageHandler.onReceive(head, "subscribe", payload);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- if (topic.equals("/sys/session/unsubscribed")) {
|
|
|
|
- JsonNode payloadJson = JsonUtil.parse(payload);
|
|
|
|
- String _topic = payloadJson.get("topic").textValue();
|
|
|
|
-
|
|
|
|
- //离线
|
|
|
|
- if (_topic.matches(Constants.MQTT.DEVICE_SUBSCRIBE_TOPIC)) {
|
|
|
|
- //head.put("topic", _topic);
|
|
|
|
- String clientId = payloadJson.get("clientid").textValue();
|
|
|
|
- log.warn("session unsubscribe, topic:{}", _topic);
|
|
|
|
- head.put("clientId", clientId);
|
|
|
|
- messageHandler.onReceive(head, "unsubscribe", payload);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- return;
|
|
|
|
- }*/
|
|
|
|
-
|
|
|
|
- String[] parts = topic.split("/");
|
|
|
|
- if (parts.length < 5) {
|
|
|
|
- log.error("message topic is illegal.");
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
messageHandler.onReceive(head, "", payload);
|
|
messageHandler.onReceive(head, "", payload);
|
|
|
|
|
|
}
|
|
}
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
- log.error("message topic is illegal.", e);
|
|
|
|
|
|
+ log.error("message is illegal.", e);
|
|
}
|
|
}
|
|
});
|
|
});
|
|
|
|
|
|
@@ -207,10 +149,6 @@ public class EmqxDeviceComponent extends AbstractDeviceComponent {
|
|
log.error("client fail: ", event.getCause());
|
|
log.error("client fail: ", event.getCause());
|
|
});
|
|
});
|
|
|
|
|
|
- /** client.pingResponseHandler(s -> {
|
|
|
|
- log.info("We have just received PINGRESP packet");
|
|
|
|
- });*/
|
|
|
|
-
|
|
|
|
} catch (Throwable e) {
|
|
} catch (Throwable e) {
|
|
throw new BizException("start emqx auth component error", e);
|
|
throw new BizException("start emqx auth component error", e);
|
|
}
|
|
}
|
|
@@ -311,6 +249,11 @@ public class EmqxDeviceComponent extends AbstractDeviceComponent {
|
|
return transparentConverter.encode(service, device);
|
|
return transparentConverter.encode(service, device);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public Object getCompMqttClientIdList(){
|
|
|
|
+ String[] result = compMqttClientIdList.toArray(new String[0]);
|
|
|
|
+ return JsonUtil.toJsonString(result);
|
|
|
|
+ }
|
|
|
|
+
|
|
@Data
|
|
@Data
|
|
public static class Message {
|
|
public static class Message {
|
|
private String topic;
|
|
private String topic;
|