|
@@ -4,29 +4,45 @@ import cc.iotkit.common.exception.BizException;
|
|
|
import cc.iotkit.common.utils.JsonUtil;
|
|
|
import cc.iotkit.comp.AbstractDeviceComponent;
|
|
|
import cc.iotkit.comp.CompConfig;
|
|
|
+import cc.iotkit.comp.IMessageHandler;
|
|
|
import cc.iotkit.comp.model.DeviceState;
|
|
|
+import cc.iotkit.comp.utils.SpringUtils;
|
|
|
import cc.iotkit.converter.DeviceMessage;
|
|
|
+import cc.iotkit.converter.ThingService;
|
|
|
+import cc.iotkit.dao.DeviceRepository;
|
|
|
+import cc.iotkit.model.device.DeviceInfo;
|
|
|
+import cc.iotkit.model.device.message.ThingModelMessage;
|
|
|
+import io.netty.handler.codec.mqtt.MqttQoS;
|
|
|
import io.vertx.core.Future;
|
|
|
import io.vertx.core.Vertx;
|
|
|
+import io.vertx.core.buffer.Buffer;
|
|
|
import io.vertx.mqtt.MqttClient;
|
|
|
import io.vertx.mqtt.MqttClientOptions;
|
|
|
-import io.vertx.mqtt.messages.MqttConnAckMessage;
|
|
|
-import lombok.SneakyThrows;
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
-
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
+import lombok.*;
|
|
|
+import org.apache.commons.beanutils.BeanUtils;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
+import java.lang.reflect.InvocationTargetException;
|
|
|
+import java.nio.charset.Charset;
|
|
|
+import java.util.*;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
|
|
|
-@Slf4j
|
|
|
+
|
|
|
public class EmqxDeviceComponent extends AbstractDeviceComponent {
|
|
|
|
|
|
+ private static final Logger log = LoggerFactory.getLogger(EmqxDeviceComponent.class);
|
|
|
private Vertx vertx;
|
|
|
private AuthVerticle authVerticle;
|
|
|
private CountDownLatch countDownLatch;
|
|
|
private String deployedId;
|
|
|
private EmqxConfig mqttConfig;
|
|
|
+ private MqttClient client;
|
|
|
+
|
|
|
+ //组件mqtt clientId,默认通过mqtt auth / acl验证。
|
|
|
+ private final Set<String> compMqttClientIdList = new HashSet<>();
|
|
|
+
|
|
|
+ private final TransparentConverter transparentConverter = new TransparentConverter();
|
|
|
|
|
|
public void create(CompConfig config) {
|
|
|
super.create(config);
|
|
@@ -38,17 +54,21 @@ public class EmqxDeviceComponent extends AbstractDeviceComponent {
|
|
|
@Override
|
|
|
public void start() {
|
|
|
try {
|
|
|
+ compMqttClientIdList.add(mqttConfig.getClientId());
|
|
|
+
|
|
|
authVerticle.setExecutor(getHandler());
|
|
|
countDownLatch = new CountDownLatch(1);
|
|
|
Future<String> future = vertx.deployVerticle(authVerticle);
|
|
|
future.onSuccess((s -> {
|
|
|
deployedId = s;
|
|
|
countDownLatch.countDown();
|
|
|
+ log.error("start emqx auth component success");
|
|
|
}));
|
|
|
future.onFailure((e) -> {
|
|
|
countDownLatch.countDown();
|
|
|
log.error("start emqx auth component failed", e);
|
|
|
});
|
|
|
+
|
|
|
countDownLatch.await();
|
|
|
|
|
|
MqttClientOptions options = new MqttClientOptions()
|
|
@@ -58,58 +78,64 @@ public class EmqxDeviceComponent extends AbstractDeviceComponent {
|
|
|
.setCleanSession(true)
|
|
|
.setKeepAliveInterval(60);
|
|
|
|
|
|
+
|
|
|
if (mqttConfig.isSsl()) {
|
|
|
options.setSsl(true)
|
|
|
.setTrustAll(true);
|
|
|
}
|
|
|
- MqttClient client = MqttClient.create(vertx, options);
|
|
|
-
|
|
|
- Future<MqttConnAckMessage> connFuture =
|
|
|
- client.connect(mqttConfig.getPort(), mqttConfig.getBroker());
|
|
|
- connFuture.onSuccess(ack -> log.info("connect emqx broker success"))
|
|
|
- .onFailure(e -> log.error("connect emqx broker failed", e));
|
|
|
+ client = MqttClient.create(vertx, options);
|
|
|
|
|
|
List<String> topics = mqttConfig.getSubscribeTopics();
|
|
|
Map<String, Integer> subscribes = new HashMap<>();
|
|
|
+
|
|
|
for (String topic : topics) {
|
|
|
subscribes.put(topic, 1);
|
|
|
}
|
|
|
|
|
|
- client.publishHandler(s -> {
|
|
|
- String topic = s.topicName();
|
|
|
- String payload = s.payload().toString();
|
|
|
- log.info("receive message,topic:{},payload:{}", topic, payload);
|
|
|
-
|
|
|
-//
|
|
|
-// //取消订阅
|
|
|
-// if (topic.equals("/sys/session/topic/unsubscribed")) {
|
|
|
-// topicUnsubscribed(payload);
|
|
|
-// return;
|
|
|
-// }
|
|
|
-//
|
|
|
-// //连接断开
|
|
|
-// if (topic.equals("/sys/client/disconnected")) {
|
|
|
-// disconnectedHandler.handler(payload);
|
|
|
-// return;
|
|
|
-// }
|
|
|
-//
|
|
|
-// String[] parts = topic.split("/");
|
|
|
-// if (parts.length < 5) {
|
|
|
-// log.error("message topic is illegal.");
|
|
|
-// return;
|
|
|
-// }
|
|
|
-// String productKey = parts[2];
|
|
|
-// String deviceName = parts[3];
|
|
|
-//
|
|
|
-// //子设备注册
|
|
|
-// if (topic.endsWith("/register")) {
|
|
|
+ /*subscribes.put("/sys/+/+/s/#", 1);
|
|
|
+ subscribes.put("/sys/client/connected", 1);
|
|
|
+ subscribes.put("/sys/client/disconnected", 1);
|
|
|
+ subscribes.put("/sys/session/subscribed", 1);
|
|
|
+ subscribes.put("/sys/session/unsubscribed", 1);*/
|
|
|
|
|
|
|
|
|
+ // handler will be called when we have a message in topic we subscribe for
|
|
|
+ client.publishHandler(p -> {
|
|
|
+ log.info("Client received message on [{}] payload [{}] with QoS [{}]", p.topicName(), p.payload().toString(Charset.defaultCharset()), p.qosLevel());
|
|
|
+
|
|
|
+ String topic = p.topicName();
|
|
|
+ String payload = p.payload().toString();
|
|
|
+
|
|
|
+ try {
|
|
|
+ IMessageHandler messageHandler = getHandler();
|
|
|
+ if (messageHandler != null) {
|
|
|
Map<String, Object> head = new HashMap<>();
|
|
|
head.put("topic", topic);
|
|
|
- getHandler().onReceive(head, "", payload);
|
|
|
- }).subscribe(subscribes).onSuccess(a -> log.info("subscribe topic success"))
|
|
|
- .onFailure(e -> log.error("subscribe topic failed", e));
|
|
|
+ messageHandler.onReceive(head, "", payload);
|
|
|
+
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("message is illegal.", e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ client.connect(mqttConfig.getPort(), mqttConfig.getBroker(), s -> {
|
|
|
+ if (s.succeeded()) {
|
|
|
+ log.info("client connect success.");
|
|
|
+ client.subscribe(subscribes, e -> {
|
|
|
+ if (e.succeeded()) {
|
|
|
+ log.info("===>subscribe success: {}", e.result());
|
|
|
+ } else {
|
|
|
+ log.error("===>subscribe fail: ", e.cause());
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ } else {
|
|
|
+ log.error("client connect fail: ", s.cause());
|
|
|
+ }
|
|
|
+ }).exceptionHandler(event -> {
|
|
|
+ log.error("client fail: ", event.getCause());
|
|
|
+ });
|
|
|
|
|
|
} catch (Throwable e) {
|
|
|
throw new BizException("start emqx auth component error", e);
|
|
@@ -122,6 +148,9 @@ public class EmqxDeviceComponent extends AbstractDeviceComponent {
|
|
|
authVerticle.stop();
|
|
|
Future<Void> future = vertx.undeploy(deployedId);
|
|
|
future.onSuccess(unused -> log.info("stop emqx auth component success"));
|
|
|
+ client.disconnect()
|
|
|
+ .onSuccess(unused -> log.info("stop emqx component success"))
|
|
|
+ .onFailure(unused -> log.info("stop emqx component failure"));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -131,17 +160,85 @@ public class EmqxDeviceComponent extends AbstractDeviceComponent {
|
|
|
|
|
|
@Override
|
|
|
public void onDeviceStateChange(DeviceState state) {
|
|
|
-
|
|
|
+ DeviceState.Parent parent = state.getParent();
|
|
|
+ if (parent == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ DeviceRepository deviceRepository = SpringUtils.getBean(DeviceRepository.class);
|
|
|
+
|
|
|
+ DeviceInfo deviceInfo = deviceRepository.findByProductKeyAndDeviceName(state.getProductKey(), state.getDeviceName());
|
|
|
+ if (deviceInfo != null) {
|
|
|
+ boolean isOnline = DeviceState.STATE_ONLINE.equals(state.getState());
|
|
|
+ deviceInfo.getState().setOnline(isOnline);
|
|
|
+ if (!isOnline) {
|
|
|
+ deviceInfo.getState().setOfflineTime(System.currentTimeMillis());
|
|
|
+ }
|
|
|
+ if (isOnline) {
|
|
|
+ deviceInfo.getState().setOnlineTime(System.currentTimeMillis());
|
|
|
+ }
|
|
|
+ deviceRepository.save(deviceInfo);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void send(DeviceMessage message) {
|
|
|
+ Object obj = message.getContent();
|
|
|
+ if (!(obj instanceof Map)) {
|
|
|
+ throw new BizException("message content is not Map");
|
|
|
+ }
|
|
|
+ Message msg = new Message();
|
|
|
+ try {
|
|
|
+ //obj中的key,如果bean中有这个属性,就把这个key对应的value值赋给msg的属性
|
|
|
+ BeanUtils.populate(msg, (Map<String, ? extends Object>) obj);
|
|
|
+ } catch (Throwable e) {
|
|
|
+ throw new BizException("message content is incorrect");
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("publish topic:{},payload:{}", msg.getTopic(), msg.getPayload());
|
|
|
|
|
|
+ client.publish(msg.getTopic(),
|
|
|
+ Buffer.buffer(msg.getPayload()),
|
|
|
+ MqttQoS.AT_LEAST_ONCE,
|
|
|
+ false,
|
|
|
+ false);
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public boolean exist(String productKey, String deviceName) {
|
|
|
- return false;
|
|
|
+ /**
|
|
|
+ * 透传解码
|
|
|
+ */
|
|
|
+ public ThingModelMessage transparentDecode(Map<String, Object> msg) throws InvocationTargetException, IllegalAccessException {
|
|
|
+ TransparentMsg transparentMsg = new TransparentMsg();
|
|
|
+ BeanUtils.populate(transparentMsg, msg);
|
|
|
+ return transparentConverter.decode(transparentMsg);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 透传编码
|
|
|
+ */
|
|
|
+ public DeviceMessage transparentEncode(ThingService<?> service, cc.iotkit.converter.Device device) {
|
|
|
+ return transparentConverter.encode(service, device);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 提供js调用
|
|
|
+ */
|
|
|
+ public Object getCompMqttClientIdList() {
|
|
|
+ String[] result = compMqttClientIdList.toArray(new String[0]);
|
|
|
+ return JsonUtil.toJsonString(result);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Data
|
|
|
+ public static class Message {
|
|
|
+ private String topic;
|
|
|
+ private String payload;
|
|
|
}
|
|
|
|
|
|
+ @Data
|
|
|
+ @NoArgsConstructor
|
|
|
+ @AllArgsConstructor
|
|
|
+ @ToString
|
|
|
+ public static class Device {
|
|
|
+ private String productKey;
|
|
|
+ private String deviceName;
|
|
|
+ }
|
|
|
}
|