|
@@ -1,32 +1,50 @@
|
|
|
package cc.iotkit.comp.emqx;
|
|
|
|
|
|
+import cc.iotkit.common.Constants;
|
|
|
import cc.iotkit.common.exception.BizException;
|
|
|
import cc.iotkit.common.utils.JsonUtil;
|
|
|
import cc.iotkit.comp.AbstractDeviceComponent;
|
|
|
import cc.iotkit.comp.CompConfig;
|
|
|
+import cc.iotkit.comp.IMessageHandler;
|
|
|
import cc.iotkit.comp.model.DeviceState;
|
|
|
import cc.iotkit.converter.DeviceMessage;
|
|
|
+import cc.iotkit.converter.ThingService;
|
|
|
+import cc.iotkit.model.device.message.ThingModelMessage;
|
|
|
+import com.fasterxml.jackson.databind.JsonNode;
|
|
|
+import io.netty.handler.codec.mqtt.MqttQoS;
|
|
|
import io.vertx.core.Future;
|
|
|
import io.vertx.core.Vertx;
|
|
|
+import io.vertx.core.buffer.Buffer;
|
|
|
import io.vertx.mqtt.MqttClient;
|
|
|
import io.vertx.mqtt.MqttClientOptions;
|
|
|
-import io.vertx.mqtt.messages.MqttConnAckMessage;
|
|
|
-import lombok.SneakyThrows;
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
+import lombok.*;
|
|
|
+import org.apache.commons.beanutils.BeanUtils;
|
|
|
+import org.apache.commons.lang3.RandomStringUtils;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import java.lang.reflect.InvocationTargetException;
|
|
|
+import java.nio.charset.Charset;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
|
|
|
-@Slf4j
|
|
|
+
|
|
|
public class EmqxDeviceComponent extends AbstractDeviceComponent {
|
|
|
|
|
|
+ private static final Logger log = LoggerFactory.getLogger(EmqxDeviceComponent.class);
|
|
|
private Vertx vertx;
|
|
|
private AuthVerticle authVerticle;
|
|
|
+ //private MqttVerticle mqttVerticle;
|
|
|
private CountDownLatch countDownLatch;
|
|
|
private String deployedId;
|
|
|
private EmqxConfig mqttConfig;
|
|
|
+ MqttClient client;
|
|
|
+
|
|
|
+ private final Map<String, Device> deviceChildToParent = new HashMap<>();
|
|
|
+
|
|
|
+ private TransparentConverter transparentConverter = new TransparentConverter();
|
|
|
|
|
|
public void create(CompConfig config) {
|
|
|
super.create(config);
|
|
@@ -44,11 +62,13 @@ public class EmqxDeviceComponent extends AbstractDeviceComponent {
|
|
|
future.onSuccess((s -> {
|
|
|
deployedId = s;
|
|
|
countDownLatch.countDown();
|
|
|
+ log.error("start emqx auth component success", s);
|
|
|
}));
|
|
|
future.onFailure((e) -> {
|
|
|
countDownLatch.countDown();
|
|
|
log.error("start emqx auth component failed", e);
|
|
|
});
|
|
|
+
|
|
|
countDownLatch.await();
|
|
|
|
|
|
MqttClientOptions options = new MqttClientOptions()
|
|
@@ -62,54 +82,134 @@ public class EmqxDeviceComponent extends AbstractDeviceComponent {
|
|
|
options.setSsl(true)
|
|
|
.setTrustAll(true);
|
|
|
}
|
|
|
- MqttClient client = MqttClient.create(vertx, options);
|
|
|
+ client = MqttClient.create(vertx, options);
|
|
|
+
|
|
|
+
|
|
|
+ // handler will be called when we have a message in topic we subscribe for
|
|
|
+ /*client.publishHandler(p -> {
|
|
|
+ log.info("Client received message on [{}] payload [{}] with QoS [{}]", p.topicName(), p.payload().toString(Charset.defaultCharset()), p.qosLevel());
|
|
|
+ });*/
|
|
|
|
|
|
- Future<MqttConnAckMessage> connFuture =
|
|
|
- client.connect(mqttConfig.getPort(), mqttConfig.getBroker());
|
|
|
- connFuture.onSuccess(ack -> log.info("connect emqx broker success"))
|
|
|
- .onFailure(e -> log.error("connect emqx broker failed", e));
|
|
|
|
|
|
List<String> topics = mqttConfig.getSubscribeTopics();
|
|
|
Map<String, Integer> subscribes = new HashMap<>();
|
|
|
- for (String topic : topics) {
|
|
|
+
|
|
|
+ subscribes.put("/sys/+/+/s/#", 1);
|
|
|
+ subscribes.put("/sys/client/connected", 1);
|
|
|
+ subscribes.put("/sys/client/disconnected", 1);
|
|
|
+ subscribes.put("/sys/session/subscribed", 1);
|
|
|
+ subscribes.put("/sys/session/unsubscribed", 1);
|
|
|
+
|
|
|
+ //"/sys/+/+/s/#","/sys/client/disconnected"
|
|
|
+
|
|
|
+ /*for (String topic : topics) {
|
|
|
subscribes.put(topic, 1);
|
|
|
- }
|
|
|
+ }*/
|
|
|
+
|
|
|
+ // handler will be called when we have a message in topic we subscribe for
|
|
|
+ client.publishHandler(p -> {
|
|
|
+ log.info("Client received message on [{}] payload [{}] with QoS [{}]", p.topicName(), p.payload().toString(Charset.defaultCharset()), p.qosLevel());
|
|
|
|
|
|
- client.publishHandler(s -> {
|
|
|
- String topic = s.topicName();
|
|
|
- String payload = s.payload().toString();
|
|
|
- log.info("receive message,topic:{},payload:{}", topic, payload);
|
|
|
-
|
|
|
-//
|
|
|
-// //取消订阅
|
|
|
-// if (topic.equals("/sys/session/topic/unsubscribed")) {
|
|
|
-// topicUnsubscribed(payload);
|
|
|
-// return;
|
|
|
-// }
|
|
|
-//
|
|
|
-// //连接断开
|
|
|
-// if (topic.equals("/sys/client/disconnected")) {
|
|
|
-// disconnectedHandler.handler(payload);
|
|
|
-// return;
|
|
|
-// }
|
|
|
-//
|
|
|
-// String[] parts = topic.split("/");
|
|
|
-// if (parts.length < 5) {
|
|
|
-// log.error("message topic is illegal.");
|
|
|
-// return;
|
|
|
-// }
|
|
|
-// String productKey = parts[2];
|
|
|
-// String deviceName = parts[3];
|
|
|
-//
|
|
|
-// //子设备注册
|
|
|
-// if (topic.endsWith("/register")) {
|
|
|
+ String topic = p.topicName();
|
|
|
+ String payload = p.payload().toString();
|
|
|
|
|
|
+ try {
|
|
|
+ IMessageHandler messageHandler = getHandler();
|
|
|
|
|
|
+ if (messageHandler != null) {
|
|
|
Map<String, Object> head = new HashMap<>();
|
|
|
head.put("topic", topic);
|
|
|
- getHandler().onReceive(head, "", payload);
|
|
|
- }).subscribe(subscribes).onSuccess(a -> log.info("subscribe topic success"))
|
|
|
- .onFailure(e -> log.error("subscribe topic failed", e));
|
|
|
+ if (topic.equals("/sys/client/connected")) {
|
|
|
+ JsonNode payloadJson = JsonUtil.parse(payload);
|
|
|
+ String clientId = payloadJson.get("clientid").textValue();
|
|
|
+ log.warn("client connection connected,clientId:{}", clientId);
|
|
|
+ head.put("clientId", clientId);
|
|
|
+ messageHandler.onReceive(head, "connect", payload);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ //连接断开
|
|
|
+ if (topic.equals("/sys/client/disconnected")) {
|
|
|
+ JsonNode payloadJson = JsonUtil.parse(payload);
|
|
|
+ String clientId = payloadJson.get("clientid").textValue();
|
|
|
+ log.warn("client connection closed,clientId:{}", clientId);
|
|
|
+ head.put("clientId", clientId);
|
|
|
+ messageHandler.onReceive(head, "disconnect", payload);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ ** 子设备在线离线状态(topic: ^/sys/.+/.+/c/#$)**: 改为从 从 acl 访问控制 获取离线在线状态。
|
|
|
+
|
|
|
+
|
|
|
+ if (topic.equals("/sys/session/subscribed")) {
|
|
|
+ JsonNode payloadJson = JsonUtil.parse(payload);
|
|
|
+ String _topic = payloadJson.get("topic").textValue();
|
|
|
+
|
|
|
+ //在线
|
|
|
+ if (_topic.matches(Constants.MQTT.DEVICE_SUBSCRIBE_TOPIC)) {
|
|
|
+ //head.put("topic", _topic);
|
|
|
+ String clientId = payloadJson.get("clientid").textValue();
|
|
|
+ log.warn("session subscribe, topic:{}", _topic);
|
|
|
+ head.put("clientId", clientId);
|
|
|
+ messageHandler.onReceive(head, "subscribe", payload);
|
|
|
+ }
|
|
|
+
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ if (topic.equals("/sys/session/unsubscribed")) {
|
|
|
+ JsonNode payloadJson = JsonUtil.parse(payload);
|
|
|
+ String _topic = payloadJson.get("topic").textValue();
|
|
|
+
|
|
|
+ //离线
|
|
|
+ if (_topic.matches(Constants.MQTT.DEVICE_SUBSCRIBE_TOPIC)) {
|
|
|
+ //head.put("topic", _topic);
|
|
|
+ String clientId = payloadJson.get("clientid").textValue();
|
|
|
+ log.warn("session unsubscribe, topic:{}", _topic);
|
|
|
+ head.put("clientId", clientId);
|
|
|
+ messageHandler.onReceive(head, "unsubscribe", payload);
|
|
|
+ }
|
|
|
+
|
|
|
+ return;
|
|
|
+ }*/
|
|
|
+
|
|
|
+ String[] parts = topic.split("/");
|
|
|
+ if (parts.length < 5) {
|
|
|
+ log.error("message topic is illegal.");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ messageHandler.onReceive(head, "", payload);
|
|
|
+
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("message topic is illegal.", e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ client.connect(mqttConfig.getPort(), mqttConfig.getBroker(), s -> {
|
|
|
+ if (s.succeeded()) {
|
|
|
+ log.info("client connect success.");
|
|
|
+ client.subscribe(subscribes, e -> {
|
|
|
+ if (e.succeeded()) {
|
|
|
+ log.info("===>subscribe success: {}", e.result());
|
|
|
+ } else {
|
|
|
+ log.error("===>subscribe fail: ", e.cause());
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ } else {
|
|
|
+ log.error("client connect fail: ", s.cause());
|
|
|
+ }
|
|
|
+ }).exceptionHandler(event -> {
|
|
|
+ log.error("client fail: ", event.getCause());
|
|
|
+ });
|
|
|
+
|
|
|
+ /** client.pingResponseHandler(s -> {
|
|
|
+ log.info("We have just received PINGRESP packet");
|
|
|
+ });*/
|
|
|
|
|
|
} catch (Throwable e) {
|
|
|
throw new BizException("start emqx auth component error", e);
|
|
@@ -122,6 +222,9 @@ public class EmqxDeviceComponent extends AbstractDeviceComponent {
|
|
|
authVerticle.stop();
|
|
|
Future<Void> future = vertx.undeploy(deployedId);
|
|
|
future.onSuccess(unused -> log.info("stop emqx auth component success"));
|
|
|
+ client.disconnect()
|
|
|
+ .onSuccess(unused -> log.info("stop emqx component success"))
|
|
|
+ .onFailure(unused -> log.info("stop emqx component failure"));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -131,17 +234,95 @@ public class EmqxDeviceComponent extends AbstractDeviceComponent {
|
|
|
|
|
|
@Override
|
|
|
public void onDeviceStateChange(DeviceState state) {
|
|
|
+ DeviceState.Parent parent = state.getParent();
|
|
|
+ if (parent == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Device device = new Device(state.getProductKey(), state.getDeviceName());
|
|
|
|
|
|
+ if (DeviceState.STATE_ONLINE.equals(state.getState())) {
|
|
|
+ //保存子设备所属父设备
|
|
|
+ deviceChildToParent.put(device.toString(),
|
|
|
+ new Device(parent.getProductKey(), parent.getDeviceName())
|
|
|
+ );
|
|
|
+ } else {
|
|
|
+ //删除关系
|
|
|
+ deviceChildToParent.remove(device.toString());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void send(DeviceMessage message) {
|
|
|
+ Device child = new Device(message.getProductKey(), message.getDeviceName());
|
|
|
+ //作为子设备查找父设备
|
|
|
+ Device parent = deviceChildToParent.get(child.toString());
|
|
|
+ if (parent == null) {
|
|
|
+ parent = child;
|
|
|
+ }
|
|
|
+
|
|
|
+ Object obj = message.getContent();
|
|
|
+ if (!(obj instanceof Map)) {
|
|
|
+ throw new BizException("message content is not Map");
|
|
|
+ }
|
|
|
+ Message msg = new Message();
|
|
|
+ try {
|
|
|
+ //obj中的key,如果bean中有这个属性,就把这个key对应的value值赋给msg的属性
|
|
|
+ BeanUtils.populate(msg, (Map<String, ? extends Object>) obj);
|
|
|
+ } catch (Throwable e) {
|
|
|
+ throw new BizException("message content is incorrect");
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("publish topic:{},payload:{}", msg.getTopic(), msg.getPayload());
|
|
|
|
|
|
+ client.publish(msg.getTopic(),
|
|
|
+ Buffer.buffer(msg.getPayload()),
|
|
|
+ MqttQoS.AT_LEAST_ONCE,
|
|
|
+ false,
|
|
|
+ false);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public boolean exist(String productKey, String deviceName) {
|
|
|
- return false;
|
|
|
+ return true;
|
|
|
+
|
|
|
+ /*//先作为子设备查找是否存在父设备
|
|
|
+ Device device = deviceChildToParent.get(new Device(productKey, deviceName).toString());
|
|
|
+ if (device != null) {
|
|
|
+ return true;
|
|
|
+ }*/
|
|
|
+
|
|
|
+ //return mqttVerticle.exist(productKey, deviceName);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 透传解码
|
|
|
+ */
|
|
|
+ public ThingModelMessage transparentDecode(Map<String, Object> msg) throws InvocationTargetException, IllegalAccessException {
|
|
|
+ TransparentMsg transparentMsg = new TransparentMsg();
|
|
|
+ BeanUtils.populate(transparentMsg, msg);
|
|
|
+ return transparentConverter.decode(transparentMsg);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 透传编码
|
|
|
+ */
|
|
|
+ public DeviceMessage transparentEncode(ThingService<?> service, cc.iotkit.converter.Device device) {
|
|
|
+ return transparentConverter.encode(service, device);
|
|
|
}
|
|
|
|
|
|
+ @Data
|
|
|
+ public static class Message {
|
|
|
+ private String topic;
|
|
|
+ private String payload;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Data
|
|
|
+ @NoArgsConstructor
|
|
|
+ @AllArgsConstructor
|
|
|
+ @ToString
|
|
|
+ public static class Device {
|
|
|
+ private String productKey;
|
|
|
+ private String deviceName;
|
|
|
+ }
|
|
|
}
|