DeviceMessageConsumer.java 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. /*
  2. * +----------------------------------------------------------------------
  3. * | Copyright (c) 奇特物联 2021-2022 All rights reserved.
  4. * +----------------------------------------------------------------------
  5. * | Licensed 未经许可不能去掉「奇特物联」相关版权
  6. * +----------------------------------------------------------------------
  7. * | Author: xw2sy@163.com
  8. * +----------------------------------------------------------------------
  9. */
  10. package cc.iotkit.comps.service;
  11. import cc.iotkit.common.Constants;
  12. import cc.iotkit.data.manager.IDeviceInfoData;
  13. import cc.iotkit.model.device.DeviceInfo;
  14. import cc.iotkit.model.device.message.ThingModelMessage;
  15. import cc.iotkit.mq.ConsumerHandler;
  16. import cc.iotkit.mq.MqConsumer;
  17. import cc.iotkit.mq.MqProducer;
  18. import cc.iotkit.temporal.IThingModelMessageData;
  19. import lombok.extern.slf4j.Slf4j;
  20. import org.springframework.beans.factory.annotation.Autowired;
  21. import org.springframework.beans.factory.annotation.Qualifier;
  22. import org.springframework.context.annotation.Lazy;
  23. import org.springframework.stereotype.Service;
  24. import javax.annotation.PostConstruct;
  25. @Slf4j
  26. @Service
  27. public class DeviceMessageConsumer implements ConsumerHandler<ThingModelMessage> {
  28. @Lazy
  29. @Autowired
  30. private IThingModelMessageData thingModelMessageData;
  31. @Autowired
  32. @Qualifier("deviceInfoDataCache")
  33. private IDeviceInfoData deviceInfoData;
  34. @Autowired
  35. private MqConsumer<ThingModelMessage> thingModelMessageConsumer;
  36. @Autowired
  37. private MqProducer<ThingModelMessage> thingModelMessageMqProducer;
  38. @PostConstruct
  39. public void init() {
  40. thingModelMessageConsumer.consume(Constants.THING_MODEL_MESSAGE_TOPIC, this);
  41. }
  42. @Override
  43. public void handler(ThingModelMessage msg) {
  44. try {
  45. String type = msg.getType();
  46. //重新发布属性入库消息
  47. if (ThingModelMessage.TYPE_PROPERTY.equals(type)
  48. && "report".equals(msg.getIdentifier())) {
  49. thingModelMessageMqProducer.publish(Constants.DEVICE_PROPERTY_REPORT_TOPIC, msg);
  50. }
  51. if (ThingModelMessage.TYPE_CONFIG.equals(type)) {
  52. //重新发布设备配置消息,用于设备配置下发
  53. thingModelMessageMqProducer.publish(Constants.DEVICE_CONFIG_TOPIC, msg);
  54. }
  55. DeviceInfo device = deviceInfoData.findByDeviceId(msg.getDeviceId());
  56. if (device == null) {
  57. return;
  58. }
  59. msg.setUid(device.getUid());
  60. //设备消息入库
  61. thingModelMessageData.add(msg);
  62. } catch (Throwable e) {
  63. //不能重复消费
  64. log.error("device message consumer error", e);
  65. }
  66. }
  67. }