Selaa lähdekoodia

数据流转:
1.mqtt数据流转(北向服务端)
script:
this.translate=function(msg,device){
return
{
"topic":"topic",//推送topic
"payload":JSON.stringify(msg.data) // 数据,为字符串
}
}
2.kafka数据流转(北向服务端
script:
this.translate=function(msg,device){
return
{
"topic":"topic",//推送topic
"payload":JSON.stringify(msg.data), // 数据,为字符串
"partition": 1 // 分区,不需要指定分区就不要传分区kv
}
}

huangwenlong 2 vuotta sitten
vanhempi
commit
c8e49a65d9

+ 11 - 0
iot-common/src/main/java/cc/iotkit/common/function/IfHandler.java

@@ -0,0 +1,11 @@
+package cc.iotkit.common.function;
+
+/**
+ * @author huangwenl
+ * @date 2022-11-10
+ */
+@FunctionalInterface
+public interface IfHandler {
+
+    void handler(Runnable tHandler, Runnable fHandler);
+}

+ 21 - 0
iot-common/src/main/java/cc/iotkit/common/utils/FIUtil.java

@@ -0,0 +1,21 @@
+package cc.iotkit.common.utils;
+
+import cc.iotkit.common.function.IfHandler;
+
+/**
+ * @author huangwenl
+ * @date 2022-11-10
+ */
+public class FIUtil {
+
+
+    public static IfHandler isTotF(boolean param) {
+        return (tHandler, fHandler) -> {
+            if (param) {
+                tHandler.run();
+            } else {
+                fHandler.run();
+            }
+        };
+    }
+}

+ 11 - 0
iot-rule-engine/pom.xml

@@ -71,6 +71,17 @@
             <artifactId>iot-message-core</artifactId>
         </dependency>
 
+        <!--        mqtt-->
+        <dependency>
+            <groupId>io.vertx</groupId>
+            <artifactId>vertx-mqtt</artifactId>
+        </dependency>
+
+        <!--        kafka-->
+        <dependency>
+            <groupId>io.vertx</groupId>
+            <artifactId>vertx-kafka-client</artifactId>
+        </dependency>
     </dependencies>
 
     <properties>

+ 40 - 0
iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/kafka/KafkaAction.java

@@ -0,0 +1,40 @@
+package cc.iotkit.ruleengine.action.kafka;
+
+import cc.iotkit.model.device.message.ThingModelMessage;
+import cc.iotkit.ruleengine.action.Action;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author huangwenl
+ * @date 2022-11-11
+ */
+@NoArgsConstructor
+@AllArgsConstructor
+@Data
+public class KafkaAction implements Action<KafkaService> {
+
+    public static final String TYPE = "kafka";
+
+
+    private List<KafkaService> services;
+
+    @Override
+    public String getType() {
+        return TYPE;
+    }
+
+
+    @Override
+    public List<String> execute(ThingModelMessage msg) {
+        List<String> results = new ArrayList<>();
+        for (KafkaService service : services) {
+            results.add(service.execute(msg));
+        }
+        return results;
+    }
+}

+ 64 - 0
iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/kafka/KafkaService.java

@@ -0,0 +1,64 @@
+package cc.iotkit.ruleengine.action.kafka;
+
+import cc.iotkit.common.utils.FIUtil;
+import cc.iotkit.model.device.message.ThingModelMessage;
+import cc.iotkit.ruleengine.action.ScriptService;
+import cc.iotkit.ruleengine.link.LinkFactory;
+import cc.iotkit.ruleengine.link.LinkService;
+import cc.iotkit.ruleengine.link.impl.KafkaLink;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * @author huangwenl
+ * @date 2022-11-11
+ */
+@EqualsAndHashCode(callSuper = true)
+@Slf4j
+@Data
+public class KafkaService extends ScriptService implements LinkService {
+
+    private String services;
+    private String ack;
+
+    public String execute(ThingModelMessage msg) {
+        //执行转换脚本
+        Map result = execScript(msg);
+        if (result == null) {
+            log.warn("execScript result is null");
+            return "execScript result is null";
+        }
+        boolean initResult = LinkFactory.initLink(getKey(), KafkaLink.LINK_TYPE, getLinkConf());
+
+        AtomicReference<String> data = new AtomicReference<>("");
+        FIUtil.isTotF(initResult).handler(
+                () -> LinkFactory.sendMsg(getKey(), result, data::set),
+                () -> data.set("创建连接失败!")
+        );
+
+        return data.get();
+    }
+
+    @Override
+    public String getKey() {
+        return String.format("kafka_%s", services);
+    }
+
+    @Override
+    public String getLinkType() {
+        return KafkaLink.LINK_TYPE;
+    }
+
+    @Override
+    public Map<String, Object> getLinkConf() {
+        Map<String, Object> config = new HashMap<>();
+        config.put(KafkaLink.SERVERS, services);
+        config.put(KafkaLink.ACK, ack);
+        return config;
+    }
+}

+ 39 - 0
iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/mqtt/MqttAction.java

@@ -0,0 +1,39 @@
+package cc.iotkit.ruleengine.action.mqtt;
+
+import cc.iotkit.model.device.message.ThingModelMessage;
+import cc.iotkit.ruleengine.action.Action;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author huangwenl
+ * @date 2022-11-10
+ */
+@NoArgsConstructor
+@AllArgsConstructor
+@Data
+public class MqttAction implements Action<MqttService> {
+    public static final String TYPE = "mqtt";
+
+
+    private List<MqttService> services;
+
+    @Override
+    public String getType() {
+        return TYPE;
+    }
+
+
+    @Override
+    public List<String> execute(ThingModelMessage msg) {
+        List<String> results = new ArrayList<>();
+        for (MqttService service : services) {
+            results.add(service.execute(msg));
+        }
+        return results;
+    }
+}

+ 68 - 0
iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/mqtt/MqttService.java

@@ -0,0 +1,68 @@
+package cc.iotkit.ruleengine.action.mqtt;
+
+import cc.iotkit.common.utils.FIUtil;
+import cc.iotkit.model.device.message.ThingModelMessage;
+import cc.iotkit.ruleengine.action.ScriptService;
+import cc.iotkit.ruleengine.link.LinkFactory;
+import cc.iotkit.ruleengine.link.LinkService;
+import cc.iotkit.ruleengine.link.impl.MqttClientLink;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * @author huangwenl
+ * @date 2022-11-09
+ */
+
+@EqualsAndHashCode(callSuper = true)
+@Slf4j
+@Data
+public class MqttService extends ScriptService implements LinkService {
+
+    private String username;
+    private String password;
+    private String host;
+    private int port;
+
+    public String execute(ThingModelMessage msg) {
+        //执行转换脚本
+        Map result = execScript(msg);
+        if (result == null) {
+            log.warn("execScript result is null");
+            return "execScript result is null";
+        }
+        boolean initResult = LinkFactory.initLink(getKey(), MqttClientLink.LINK_TYPE, getLinkConf());
+
+        AtomicReference<String> data = new AtomicReference<>("");
+        FIUtil.isTotF(initResult).handler(
+                () -> LinkFactory.sendMsg(getKey(), result, data::set),
+                () -> data.set("创建连接失败!")
+        );
+        return data.get();
+    }
+
+    @Override
+    public String getKey() {
+        return String.format("mqtt_%s_%d", host, port);
+    }
+
+    @Override
+    public String getLinkType() {
+        return MqttClientLink.LINK_TYPE;
+    }
+
+    @Override
+    public Map<String, Object> getLinkConf() {
+        Map<String, Object> config = new HashMap<>();
+        config.put(MqttClientLink.HOST, host);
+        config.put(MqttClientLink.PORT, port);
+        config.put(MqttClientLink.USERNAME, username);
+        config.put(MqttClientLink.PASSWORD, password);
+        return config;
+    }
+}

+ 35 - 0
iot-rule-engine/src/main/java/cc/iotkit/ruleengine/link/BaseSinkLink.java

@@ -0,0 +1,35 @@
+package cc.iotkit.ruleengine.link;
+
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * @author huangwenl
+ * @date 2022-11-10
+ */
+public interface BaseSinkLink {
+
+    /**
+     * 建立连接
+     * @param config  连接配置信息
+     */
+    boolean open(Map<String, Object> config);
+
+    /**
+     * 发送消息
+     * @param msg 消息内容
+     * @param consumer  发送回调
+     */
+    void send(Map<String, Object> msg, Consumer<String> consumer);
+
+    /**
+     * 关闭连接
+     */
+    void close();
+
+    /**
+     * 连接监听
+     * @param closeHandler
+     */
+    void closeHandler(Consumer<Void> closeHandler);
+}

+ 146 - 0
iot-rule-engine/src/main/java/cc/iotkit/ruleengine/link/LinkFactory.java

@@ -0,0 +1,146 @@
+package cc.iotkit.ruleengine.link;
+
+import cc.iotkit.common.utils.FIUtil;
+import cc.iotkit.ruleengine.link.impl.KafkaLink;
+import cc.iotkit.ruleengine.link.impl.MqttClientLink;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+/**
+ * @author huangwenl
+ * @date 2022-11-10
+ */
+@Slf4j
+public class LinkFactory {
+
+    private static final Map<String, BaseSinkLink> linkMap = new ConcurrentHashMap<>();
+    private static final Map<String, Set<String>> ruleLink = new ConcurrentHashMap<>();
+
+    /**
+     * 发送消息前,初始化连接
+     *
+     * @param key    连接标识
+     * @param type   连接类型
+     * @param config 连接配置
+     * @return
+     */
+    public static boolean initLink(String key, String type, Map<String, Object> config) {
+        AtomicBoolean exist = new AtomicBoolean(false);
+        FIUtil.isTotF(linkMap.containsKey(key)).handler(
+                () -> exist.set(true),
+                () -> exist.set(buildLink(key, type, config))
+        );
+        return exist.get();
+    }
+
+    /**
+     * 启动规则时,初始化连接
+     *
+     * @param ruleId
+     * @param ruleId 规则ID
+     * @param key    连接标识
+     * @param type   连接类型
+     * @param config 连接配置
+     * @return
+     */
+    public static boolean initLink(String ruleId, String key, String type, Map<String, Object> config) {
+        boolean result = initLink(key, type, config);
+        if (result) {
+            Set<String> linkKeys = Optional.ofNullable(ruleLink.get(ruleId)).orElse(new HashSet<>());
+            linkKeys.add(key);
+            ruleLink.put(ruleId, linkKeys);
+        }
+        return result;
+    }
+
+    /**
+     * 注册连接
+     *
+     * @param key  连接标识
+     * @param link 连接
+     */
+    public static void register(String key, BaseSinkLink link) {
+        log.info("连接器{}注册连接", key);
+        linkMap.put(key, link);
+
+        link.closeHandler(Void -> {
+            linkMap.remove(key);
+            log.info("连接器{}断开连接", key);
+        });
+    }
+
+    /**
+     * 发送消息
+     *
+     * @param key      连接标识
+     * @param msg      消息
+     * @param consumer 发送回调
+     */
+    public static void sendMsg(String key, Map<String, Object> msg, Consumer<String> consumer) {
+        try {
+            BaseSinkLink sinkLink = linkMap.get(key);
+            FIUtil.isTotF(sinkLink != null).handler(() -> sinkLink.send(msg, consumer),
+                    () -> consumer.accept(String.format("key:%s, 没有连接!", key)));
+        } catch (Exception e) {
+            e.printStackTrace();
+            consumer.accept(String.format("key:%s,发送异常:%s", key, e.getMessage()));
+        }
+
+    }
+
+    /**
+     * 停用规则
+     *
+     * @param ruleId
+     */
+    public static void ruleClose(String ruleId) {
+        Set<String> linkKeys = ruleLink.remove(ruleId);
+        // 排除其他规则也在用这个 link的
+        if (linkKeys != null && !linkKeys.isEmpty()) {
+            Set<String> existKey = new HashSet<>();
+            ruleLink.forEach((key, value) -> existKey.addAll(value));
+            linkKeys.removeAll(existKey);
+            linkKeys.forEach(LinkFactory::close);
+        }
+    }
+
+    /**
+     * 关闭连接
+     *
+     * @param key
+     */
+    public static void close(String key) {
+        BaseSinkLink link = linkMap.get(key);
+        if (link != null) {
+            link.close();
+        }
+    }
+
+    private static boolean buildLink(String key, String type, Map<String, Object> conf) {
+        boolean success = false;
+        BaseSinkLink link = null;
+        switch (type) {
+            case MqttClientLink.LINK_TYPE:
+                link = new MqttClientLink();
+                break;
+            case KafkaLink.LINK_TYPE:
+                link = new KafkaLink();
+                break;
+        }
+        if (link != null) {
+            success = link.open(conf);
+        }
+        if (success) {
+            register(key, link);
+        }
+        return success;
+    }
+
+}

+ 20 - 0
iot-rule-engine/src/main/java/cc/iotkit/ruleengine/link/LinkService.java

@@ -0,0 +1,20 @@
+package cc.iotkit.ruleengine.link;
+
+import java.util.Map;
+
+/**
+ * @author huangwenl
+ * @date 2022-11-11
+ */
+public interface LinkService {
+
+    default boolean initLink(String ruleId) {
+        return LinkFactory.initLink(ruleId, getKey(), getLinkType(), getLinkConf());
+    }
+
+    String getKey();
+
+    String getLinkType();
+
+    Map<String, Object> getLinkConf();
+}

+ 84 - 0
iot-rule-engine/src/main/java/cc/iotkit/ruleengine/link/impl/KafkaLink.java

@@ -0,0 +1,84 @@
+package cc.iotkit.ruleengine.link.impl;
+
+import cc.iotkit.common.utils.FIUtil;
+import cc.iotkit.ruleengine.link.BaseSinkLink;
+import io.vertx.core.Vertx;
+import io.vertx.kafka.client.producer.KafkaProducer;
+import io.vertx.kafka.client.producer.KafkaProducerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+/**
+ * kafka 连接器
+ * 支持自定义topic 和 分区, ack
+ * k-v 只支持String
+ *
+ * @author huangwenl
+ * @date 2022-11-11
+ */
+public class KafkaLink implements BaseSinkLink {
+    public static final String LINK_TYPE = "kafka";
+    public static final String TOPIC = "topic";
+    public static final String PAYLOAD = "payload";
+    public static final String PARTITION = "partition";
+
+    public static final String SERVERS = "servers";
+    public static final String ACK = "ack";
+
+    private KafkaProducer<String, String> producer;
+    private Consumer<Void> closeHandler;
+
+    @Override
+    public boolean open(Map<String, Object> config) {
+        try {
+            AtomicReference<Vertx> vertx = new AtomicReference<>();
+            FIUtil.isTotF(Vertx.currentContext() == null).handler(
+                    () -> vertx.set(Vertx.vertx()),
+                    () -> vertx.set(Vertx.currentContext().owner())
+            );
+            Map<String, String> kafkaConfig = new HashMap<>();
+            kafkaConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, (String) config.get(SERVERS));
+            kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+            kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+            kafkaConfig.put(ProducerConfig.ACKS_CONFIG, (String) config.get(ACK));
+            producer = KafkaProducer.create(vertx.get(), kafkaConfig);
+        } catch (Exception e) {
+            e.printStackTrace();
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void send(Map<String, Object> msg, Consumer<String> consumer) {
+        AtomicReference<KafkaProducerRecord<String, String>> record = new AtomicReference<>();
+        FIUtil.isTotF(msg.containsKey(PARTITION)).handler(
+                () -> record.set(KafkaProducerRecord.create((String) msg.get(TOPIC), "", msg.get(PAYLOAD).toString(), (Integer) msg.get(PARTITION))),
+                () -> record.set(KafkaProducerRecord.create((String) msg.get(TOPIC), msg.get(PAYLOAD).toString())));
+        // todo 异步发送(不能确认是否成功)
+        producer.write(record.get());
+        consumer.accept(String.format("kafka,topic[%s],发送成功:%s", msg.get(TOPIC), msg.get(PAYLOAD).toString()));
+    }
+
+    @Override
+    public void close() {
+        try {
+            producer.close();
+            producer = null;
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            closeHandler.accept(null);
+        }
+    }
+
+    @Override
+    public void closeHandler(Consumer<Void> consumer) {
+        this.closeHandler = consumer;
+    }
+}

+ 75 - 0
iot-rule-engine/src/main/java/cc/iotkit/ruleengine/link/impl/MqttClientLink.java

@@ -0,0 +1,75 @@
+package cc.iotkit.ruleengine.link.impl;
+
+import cc.iotkit.common.utils.FIUtil;
+import cc.iotkit.ruleengine.link.BaseSinkLink;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.mqtt.MqttClient;
+import io.vertx.mqtt.MqttClientOptions;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+/**
+ * @author huangwenl
+ * @date 2022-11-10
+ */
+public class MqttClientLink implements BaseSinkLink {
+    public static final String LINK_TYPE = "mqtt";
+    public static final String TOPIC = "topic";
+    public static final String PASSWORD = "password";
+    public static final String USERNAME = "username";
+    public static final String HOST = "host";
+    public static final String PORT = "port";
+    public static final String PAYLOAD = "payload";
+
+    private MqttClient mqttClient;
+    private Consumer<Void> closeHandler;
+
+
+    @Override
+    public boolean open(Map<String, Object> config) {
+        try {
+            AtomicReference<Vertx> vertx = new AtomicReference<>();
+            FIUtil.isTotF(Vertx.currentContext() == null).handler(
+                    () -> vertx.set(Vertx.vertx()),
+                    () -> vertx.set(Vertx.currentContext().owner())
+            );
+            MqttClientOptions clientOptions = new MqttClientOptions();
+            clientOptions.setUsername((String) config.get(USERNAME));
+            clientOptions.setPassword((String) config.get(PASSWORD));
+            mqttClient = MqttClient.create(vertx.get(), clientOptions);
+            mqttClient.connect((int) config.get(PORT), (String) config.get(HOST));
+            mqttClient.closeHandler(Void -> closeHandler.accept(null));
+        } catch (Exception e) {
+            e.printStackTrace();
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void send(Map<String, Object> msg, Consumer<String> consumer) {
+        FIUtil.isTotF(mqttClient.isConnected()).handler(
+                () -> {
+                    mqttClient.publish((String) msg.get(TOPIC),
+                            Buffer.buffer(msg.get(PAYLOAD).toString()),
+                            MqttQoS.AT_MOST_ONCE, false, false);
+                    consumer.accept(String.format("mqtt, topic:[%s],发送成功:,%s", msg.get(TOPIC), msg.get(PAYLOAD).toString()));
+                },
+                () -> consumer.accept("mqtt,连接断开,发送失败"));
+    }
+
+    @Override
+    public void close() {
+        mqttClient.disconnect();
+        mqttClient = null;
+    }
+
+    @Override
+    public void closeHandler(Consumer<Void> consumer) {
+        this.closeHandler = consumer;
+    }
+}

+ 23 - 2
iot-rule-engine/src/main/java/cc/iotkit/ruleengine/rule/RuleManager.java

@@ -16,9 +16,14 @@ import cc.iotkit.model.Paging;
 import cc.iotkit.model.rule.RuleAction;
 import cc.iotkit.model.rule.RuleInfo;
 import cc.iotkit.ruleengine.action.*;
+import cc.iotkit.ruleengine.action.kafka.KafkaAction;
+import cc.iotkit.ruleengine.action.kafka.KafkaService;
+import cc.iotkit.ruleengine.action.mqtt.MqttAction;
+import cc.iotkit.ruleengine.action.mqtt.MqttService;
 import cc.iotkit.ruleengine.config.RuleConfiguration;
 import cc.iotkit.ruleengine.filter.DeviceFilter;
 import cc.iotkit.ruleengine.filter.Filter;
+import cc.iotkit.ruleengine.link.LinkFactory;
 import cc.iotkit.ruleengine.listener.DeviceListener;
 import cc.iotkit.ruleengine.listener.Listener;
 import lombok.SneakyThrows;
@@ -90,6 +95,8 @@ public class RuleManager {
 
     public void remove(String ruleId) {
         ruleMessageHandler.removeRule(ruleId);
+        // 移出link连接
+        LinkFactory.ruleClose(ruleId);
     }
 
     public void pause(String ruleId) {
@@ -111,7 +118,7 @@ public class RuleManager {
         }
         List<Action<?>> actions = new ArrayList<>();
         for (RuleAction action : ruleInfo.getActions()) {
-            actions.add(parseAction(action.getType(), action.getConfig()));
+            actions.add(parseAction(ruleInfo.getId(), action.getType(), action.getConfig()));
         }
 
         return new Rule(ruleInfo.getId(), ruleInfo.getName(), listeners, filters, actions);
@@ -133,7 +140,7 @@ public class RuleManager {
         return null;
     }
 
-    private Action<?> parseAction(String type, String config) {
+    private Action<?> parseAction(String ruleId, String type, String config) {
         if (DeviceAction.TYPE.equals(type)) {
             DeviceAction action = parse(config, DeviceAction.class);
             action.setDeviceActionService(deviceActionService);
@@ -144,6 +151,20 @@ public class RuleManager {
                 service.setDeviceInfoData(deviceInfoData);
             }
             return httpAction;
+        } else if (MqttAction.TYPE.equals(type)) {
+            MqttAction mqttAction = parse(config, MqttAction.class);
+            for (MqttService service : mqttAction.getServices()) {
+                service.setDeviceInfoData(deviceInfoData);
+                service.initLink(ruleId);
+            }
+            return mqttAction;
+        } else if (KafkaAction.TYPE.equals(type)) {
+            KafkaAction kafkaAction = parse(config, KafkaAction.class);
+            for (KafkaService service : kafkaAction.getServices()) {
+                service.setDeviceInfoData(deviceInfoData);
+                service.initLink(ruleId);
+            }
+            return kafkaAction;
         }
         return null;
     }

+ 6 - 0
pom.xml

@@ -184,6 +184,12 @@
                 <scope>import</scope>
             </dependency>
 
+            <dependency>
+                <groupId>io.vertx</groupId>
+                <artifactId>vertx-kafka-client</artifactId>
+                <version>${vertx.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>io.vertx</groupId>
                 <artifactId>vertx-core</artifactId>