RuleManager.java 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. /*
  2. * +----------------------------------------------------------------------
  3. * | Copyright (c) 奇特物联 2021-2022 All rights reserved.
  4. * +----------------------------------------------------------------------
  5. * | Licensed 未经许可不能去掉「奇特物联」相关版权
  6. * +----------------------------------------------------------------------
  7. * | Author: xw2sy@163.com
  8. * +----------------------------------------------------------------------
  9. */
  10. package cc.iotkit.ruleengine.rule;
  11. import cc.iotkit.common.api.PageRequest;
  12. import cc.iotkit.common.api.Paging;
  13. import cc.iotkit.common.utils.JsonUtils;
  14. import cc.iotkit.common.utils.StringUtils;
  15. import cc.iotkit.data.manager.IDeviceInfoData;
  16. import cc.iotkit.data.manager.IRuleInfoData;
  17. import cc.iotkit.model.rule.FilterConfig;
  18. import cc.iotkit.model.rule.RuleAction;
  19. import cc.iotkit.model.rule.RuleInfo;
  20. import cc.iotkit.ruleengine.action.Action;
  21. import cc.iotkit.ruleengine.action.device.DeviceAction;
  22. import cc.iotkit.ruleengine.action.device.DeviceActionService;
  23. import cc.iotkit.ruleengine.action.http.HttpAction;
  24. import cc.iotkit.ruleengine.action.http.HttpService;
  25. import cc.iotkit.ruleengine.action.kafka.KafkaAction;
  26. import cc.iotkit.ruleengine.action.kafka.KafkaService;
  27. import cc.iotkit.ruleengine.action.mqtt.MqttAction;
  28. import cc.iotkit.ruleengine.action.mqtt.MqttService;
  29. import cc.iotkit.ruleengine.action.tcp.TcpAction;
  30. import cc.iotkit.ruleengine.action.tcp.TcpService;
  31. import cc.iotkit.ruleengine.config.RuleConfiguration;
  32. import cc.iotkit.ruleengine.filter.DeviceFilter;
  33. import cc.iotkit.ruleengine.filter.Filter;
  34. import cc.iotkit.ruleengine.link.LinkFactory;
  35. import cc.iotkit.ruleengine.listener.DeviceListener;
  36. import cc.iotkit.ruleengine.listener.Listener;
  37. import cn.hutool.core.collection.CollectionUtil;
  38. import lombok.SneakyThrows;
  39. import lombok.extern.slf4j.Slf4j;
  40. import org.springframework.beans.factory.annotation.Autowired;
  41. import org.springframework.beans.factory.annotation.Qualifier;
  42. import org.springframework.stereotype.Component;
  43. import java.util.ArrayList;
  44. import java.util.List;
  45. import java.util.concurrent.Executors;
  46. import java.util.concurrent.ScheduledExecutorService;
  47. import java.util.concurrent.TimeUnit;
  48. @Component
  49. @Slf4j
  50. public class RuleManager {
  51. @Autowired
  52. private RuleConfiguration ruleConfiguration;
  53. @Autowired
  54. private RuleMessageHandler ruleMessageHandler;
  55. @Autowired
  56. private IRuleInfoData ruleInfoData;
  57. @Autowired
  58. @Qualifier("deviceInfoPropertyDataCache")
  59. private IDeviceInfoData deviceInfoData;
  60. @Autowired
  61. private DeviceActionService deviceActionService;
  62. public RuleManager() {
  63. ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
  64. executorService.schedule(this::initRules, 1, TimeUnit.SECONDS);
  65. }
  66. @SneakyThrows
  67. public void initRules() {
  68. int idx = 1;
  69. while (true) {
  70. PageRequest<RuleInfo> pageRequest = new PageRequest<>();
  71. pageRequest.setPageNum(idx);
  72. pageRequest.setPageSize(100);
  73. Paging<RuleInfo> all = ruleInfoData.findAll(pageRequest);
  74. List<RuleInfo> rules = all.getRows();
  75. if (CollectionUtil.isEmpty(rules)) {
  76. return;
  77. }
  78. rules.forEach(rule -> {
  79. try {
  80. //不添加停止的规则
  81. if (RuleInfo.STATE_STOPPED.equals(rule.getState())) {
  82. return;
  83. }
  84. log.info("got rule {} to init", rule.getId());
  85. add(rule);
  86. } catch (Throwable e) {
  87. log.error("add rule error", e);
  88. }
  89. });
  90. idx++;
  91. }
  92. }
  93. public void add(RuleInfo ruleInfo) {
  94. Rule rule = parseRule(ruleInfo);
  95. ruleMessageHandler.putRule(rule);
  96. }
  97. public void remove(String ruleId) {
  98. ruleMessageHandler.removeRule(ruleId);
  99. // 移出link连接
  100. LinkFactory.ruleClose(ruleId);
  101. }
  102. public void pause(String ruleId) {
  103. remove(ruleId);
  104. }
  105. public void resume(RuleInfo ruleInfo) {
  106. add(ruleInfo);
  107. }
  108. private Rule parseRule(RuleInfo ruleInfo) {
  109. List<Listener<?>> listeners = new ArrayList<>();
  110. for (FilterConfig listener : ruleInfo.getListeners()) {
  111. if (StringUtils.isBlank(listener.getConfig())) {
  112. continue;
  113. }
  114. listeners.add(parseListener(listener.getType(), listener.getConfig()));
  115. }
  116. List<Filter<?>> filters = new ArrayList<>();
  117. for (FilterConfig filter : ruleInfo.getFilters()) {
  118. if (StringUtils.isBlank(filter.getConfig())) {
  119. continue;
  120. }
  121. filters.add(parseFilter(filter.getType(), filter.getConfig()));
  122. }
  123. List<Action<?>> actions = new ArrayList<>();
  124. for (RuleAction action : ruleInfo.getActions()) {
  125. if (StringUtils.isBlank(action.getConfig())) {
  126. continue;
  127. }
  128. actions.add(parseAction(ruleInfo.getId(), action.getType(), action.getConfig()));
  129. }
  130. return new Rule(ruleInfo.getId(), ruleInfo.getName(), listeners, filters, actions);
  131. }
  132. private Listener<?> parseListener(String type, String config) {
  133. if (DeviceListener.TYPE.equals(type)) {
  134. return parse(config, DeviceListener.class);
  135. }
  136. return null;
  137. }
  138. private Filter<?> parseFilter(String type, String config) {
  139. if (DeviceFilter.TYPE.equals(type)) {
  140. DeviceFilter filter = parse(config, DeviceFilter.class);
  141. filter.setDeviceInfoData(deviceInfoData);
  142. return filter;
  143. }
  144. return null;
  145. }
  146. private Action<?> parseAction(String ruleId, String type, String config) {
  147. if (DeviceAction.TYPE.equals(type)) {
  148. DeviceAction action = parse(config, DeviceAction.class);
  149. action.setDeviceActionService(deviceActionService);
  150. return action;
  151. } else if (HttpAction.TYPE.equals(type)) {
  152. HttpAction httpAction = parse(config, HttpAction.class);
  153. for (HttpService service : httpAction.getServices()) {
  154. service.setDeviceInfoData(deviceInfoData);
  155. }
  156. return httpAction;
  157. } else if (MqttAction.TYPE.equals(type)) {
  158. MqttAction mqttAction = parse(config, MqttAction.class);
  159. for (MqttService service : mqttAction.getServices()) {
  160. service.setDeviceInfoData(deviceInfoData);
  161. service.initLink(ruleId);
  162. }
  163. return mqttAction;
  164. } else if (KafkaAction.TYPE.equals(type)) {
  165. KafkaAction kafkaAction = parse(config, KafkaAction.class);
  166. for (KafkaService service : kafkaAction.getServices()) {
  167. service.setDeviceInfoData(deviceInfoData);
  168. service.initLink(ruleId);
  169. }
  170. return kafkaAction;
  171. } else if (TcpAction.TYPE.equals(type)) {
  172. TcpAction tcpAction = parse(config, TcpAction.class);
  173. for (TcpService service : tcpAction.getServices()) {
  174. service.setDeviceInfoData(deviceInfoData);
  175. service.initLink(ruleId);
  176. }
  177. return tcpAction;
  178. }
  179. return null;
  180. }
  181. private <T> T parse(String config, Class<T> cls) {
  182. return JsonUtils.parseObject(config, cls);
  183. }
  184. }