Эх сурвалжийг харах

1.规则启动MQTT连接记录清除失败Bug
2.mqtt重连机制(vert.x 不像paho只带重连)
3.mqtt、kafka消息发送阻塞响应

huangwenlong 2 жил өмнө
parent
commit
bfeecd37f2

+ 16 - 4
iot-rule-engine/src/main/java/cc/iotkit/ruleengine/link/impl/KafkaLink.java

@@ -2,9 +2,11 @@ package cc.iotkit.ruleengine.link.impl;
 
 import cc.iotkit.common.utils.FIUtil;
 import cc.iotkit.ruleengine.link.BaseSinkLink;
+import io.vertx.core.Future;
 import io.vertx.core.Vertx;
 import io.vertx.kafka.client.producer.KafkaProducer;
 import io.vertx.kafka.client.producer.KafkaProducerRecord;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringSerializer;
 
@@ -21,6 +23,7 @@ import java.util.function.Consumer;
  * @author huangwenl
  * @date 2022-11-11
  */
+@Slf4j
 public class KafkaLink implements BaseSinkLink {
     public static final String LINK_TYPE = "kafka";
     public static final String TOPIC = "topic";
@@ -60,16 +63,25 @@ public class KafkaLink implements BaseSinkLink {
         FIUtil.isTotF(msg.containsKey(PARTITION)).handler(
                 () -> record.set(KafkaProducerRecord.create((String) msg.get(TOPIC), "", msg.get(PAYLOAD).toString(), (Integer) msg.get(PARTITION))),
                 () -> record.set(KafkaProducerRecord.create((String) msg.get(TOPIC), msg.get(PAYLOAD).toString())));
-        // todo 异步发送(不能确认是否成功)
-        producer.write(record.get());
-        consumer.accept(String.format("kafka,topic[%s],发送成功:%s", msg.get(TOPIC), msg.get(PAYLOAD).toString()));
+        // 同步等待结果
+        try {
+            Future<Void> write = producer.write(record.get());
+            write.toCompletionStage().toCompletableFuture().get();
+            FIUtil.isTotF(write.succeeded()).handler(
+                    () -> consumer.accept(String.format("kafka,topic[%s],发送成功:%s", msg.get(TOPIC), msg.get(PAYLOAD).toString())),
+                    () -> consumer.accept(String.format("kafka,topic[%s],发送失败:%s", msg.get(TOPIC), msg.get(PAYLOAD).toString()))
+            );
+        } catch (Exception e) {
+            e.printStackTrace();
+            consumer.accept(String.format("kafka,topic[%s],发送异常:%s", msg.get(TOPIC), msg.get(PAYLOAD).toString()));
+        }
+
     }
 
     @Override
     public void close() {
         try {
             producer.close();
-            producer = null;
         } catch (Exception e) {
             e.printStackTrace();
         } finally {

+ 46 - 8
iot-rule-engine/src/main/java/cc/iotkit/ruleengine/link/impl/MqttClientLink.java

@@ -3,10 +3,12 @@ package cc.iotkit.ruleengine.link.impl;
 import cc.iotkit.common.utils.FIUtil;
 import cc.iotkit.ruleengine.link.BaseSinkLink;
 import io.netty.handler.codec.mqtt.MqttQoS;
+import io.vertx.core.Future;
 import io.vertx.core.Vertx;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.mqtt.MqttClient;
 import io.vertx.mqtt.MqttClientOptions;
+import lombok.extern.slf4j.Slf4j;
 
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
@@ -16,6 +18,7 @@ import java.util.function.Consumer;
  * @author huangwenl
  * @date 2022-11-10
  */
+@Slf4j
 public class MqttClientLink implements BaseSinkLink {
     public static final String LINK_TYPE = "mqtt";
     public static final String TOPIC = "topic";
@@ -27,6 +30,10 @@ public class MqttClientLink implements BaseSinkLink {
 
     private MqttClient mqttClient;
     private Consumer<Void> closeHandler;
+    private MqttClientOptions clientOptions;
+    private String host;
+    private int port;
+    private boolean connecting;
 
 
     @Override
@@ -37,14 +44,24 @@ public class MqttClientLink implements BaseSinkLink {
                     () -> vertx.set(Vertx.vertx()),
                     () -> vertx.set(Vertx.currentContext().owner())
             );
-            MqttClientOptions clientOptions = new MqttClientOptions();
+            clientOptions = new MqttClientOptions();
             clientOptions.setUsername((String) config.get(USERNAME));
             clientOptions.setPassword((String) config.get(PASSWORD));
             mqttClient = MqttClient.create(vertx.get(), clientOptions);
-            mqttClient.connect((int) config.get(PORT), (String) config.get(HOST));
-            mqttClient.closeHandler(Void -> closeHandler.accept(null));
+            host = (String) config.get(HOST);
+            port = (int) config.get(PORT);
+            mqttClient = MqttClient.create(vertx.get(), clientOptions);
+            connecting = true;
+            mqttClient.connect(port, host,
+                    s -> {
+                        connecting = false;
+                        if (!s.succeeded()) {
+                            closeHandler.accept(null);
+                        }
+                    });
         } catch (Exception e) {
             e.printStackTrace();
+            connecting = false;
             return false;
         }
         return true;
@@ -54,18 +71,39 @@ public class MqttClientLink implements BaseSinkLink {
     public void send(Map<String, Object> msg, Consumer<String> consumer) {
         FIUtil.isTotF(mqttClient.isConnected()).handler(
                 () -> {
-                    mqttClient.publish((String) msg.get(TOPIC),
+                    Future<Integer> publish = mqttClient.publish((String) msg.get(TOPIC),
                             Buffer.buffer(msg.get(PAYLOAD).toString()),
                             MqttQoS.AT_MOST_ONCE, false, false);
-                    consumer.accept(String.format("mqtt, topic:[%s],发送成功:,%s", msg.get(TOPIC), msg.get(PAYLOAD).toString()));
+                    try {
+                        publish.toCompletionStage().toCompletableFuture().get();
+                        FIUtil.isTotF(publish.succeeded()).handler(
+                                () -> consumer.accept(String.format("mqtt,topic[%s],发送成功:%s", msg.get(TOPIC), msg.get(PAYLOAD).toString())),
+                                () -> consumer.accept(String.format("mqtt,topic[%s],发送失败:%s", msg.get(TOPIC), msg.get(PAYLOAD).toString()))
+                        );
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        consumer.accept(String.format("mqtt,topic[%s],发送异常:%s", msg.get(TOPIC), msg.get(PAYLOAD).toString()));
+                    }
                 },
-                () -> consumer.accept("mqtt,连接断开,发送失败"));
+                () -> {
+                    consumer.accept("mqtt,连接断开,发送失败");
+                    if (!connecting) {
+                        log.info("mqtt重连!");
+                        connecting = true;
+                        mqttClient.connect(port, host, s -> connecting = false);
+                    }
+                });
     }
 
     @Override
     public void close() {
-        mqttClient.disconnect();
-        mqttClient = null;
+        try {
+            mqttClient.disconnect();
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            closeHandler.accept(null);
+        }
     }
 
     @Override