KafkaService.java 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package cc.iotkit.ruleengine.action.kafka;
  2. import cc.iotkit.common.utils.FIUtil;
  3. import cc.iotkit.model.device.message.ThingModelMessage;
  4. import cc.iotkit.ruleengine.action.ScriptService;
  5. import cc.iotkit.ruleengine.link.LinkFactory;
  6. import cc.iotkit.ruleengine.link.LinkService;
  7. import cc.iotkit.ruleengine.link.impl.KafkaLink;
  8. import lombok.Data;
  9. import lombok.EqualsAndHashCode;
  10. import lombok.extern.slf4j.Slf4j;
  11. import java.util.HashMap;
  12. import java.util.Map;
  13. import java.util.concurrent.atomic.AtomicReference;
  14. /**
  15. * @author huangwenl
  16. * @date 2022-11-11
  17. */
  18. @EqualsAndHashCode(callSuper = true)
  19. @Slf4j
  20. @Data
  21. public class KafkaService extends ScriptService implements LinkService {
  22. private String services;
  23. private String ack;
  24. public String execute(ThingModelMessage msg) {
  25. //执行转换脚本
  26. Map result = execScript(msg);
  27. if (result == null) {
  28. log.warn("execScript result is null");
  29. return "execScript result is null";
  30. }
  31. boolean initResult = LinkFactory.initLink(getKey(), KafkaLink.LINK_TYPE, getLinkConf());
  32. AtomicReference<String> data = new AtomicReference<>("");
  33. FIUtil.isTotF(initResult).handler(
  34. () -> LinkFactory.sendMsg(getKey(), result, data::set),
  35. () -> data.set("创建连接失败!")
  36. );
  37. return data.get();
  38. }
  39. @Override
  40. public String getKey() {
  41. return String.format("kafka_%s", services);
  42. }
  43. @Override
  44. public String getLinkType() {
  45. return KafkaLink.LINK_TYPE;
  46. }
  47. @Override
  48. public Map<String, Object> getLinkConf() {
  49. Map<String, Object> config = new HashMap<>();
  50. config.put(KafkaLink.SERVERS, services);
  51. config.put(KafkaLink.ACK, ack);
  52. return config;
  53. }
  54. }