DevicePropertyConsumer.java 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. /*
  2. * +----------------------------------------------------------------------
  3. * | Copyright (c) 奇特物联 2021-2022 All rights reserved.
  4. * +----------------------------------------------------------------------
  5. * | Licensed 未经许可不能去掉「奇特物联」相关版权
  6. * +----------------------------------------------------------------------
  7. * | Author: xw2sy@163.com
  8. * +----------------------------------------------------------------------
  9. */
  10. package cc.iotkit.comps.service;
  11. import cc.iotkit.common.constant.Constants;
  12. import cc.iotkit.common.utils.JsonUtils;
  13. import cc.iotkit.common.utils.StringUtils;
  14. import cc.iotkit.data.manager.IDeviceInfoData;
  15. import cc.iotkit.data.manager.IThingModelData;
  16. import cc.iotkit.model.device.DeviceInfo;
  17. import cc.iotkit.model.device.message.ThingModelMessage;
  18. import cc.iotkit.model.product.ThingModel;
  19. import cc.iotkit.mq.ConsumerHandler;
  20. import cc.iotkit.mq.MqConsumer;
  21. import cc.iotkit.temporal.IDevicePropertyData;
  22. import lombok.extern.slf4j.Slf4j;
  23. import org.springframework.beans.factory.annotation.Autowired;
  24. import org.springframework.beans.factory.annotation.Qualifier;
  25. import org.springframework.stereotype.Service;
  26. import javax.annotation.PostConstruct;
  27. import java.util.HashMap;
  28. import java.util.Map;
  29. import java.util.stream.Collectors;
  30. /**
  31. * 设备属性消息消费入库
  32. */
  33. @Slf4j
  34. @Service
  35. public class DevicePropertyConsumer implements ConsumerHandler<ThingModelMessage> {
  36. @Autowired
  37. private MqConsumer<ThingModelMessage> thingModelMessageMqConsumer;
  38. @Autowired
  39. private IDevicePropertyData devicePropertyData;
  40. @Autowired
  41. @Qualifier("deviceInfoDataCache")
  42. private IDeviceInfoData deviceInfoData;
  43. @Autowired
  44. @Qualifier("thingModelDataCache")
  45. private IThingModelData thingModelData;
  46. @PostConstruct
  47. public void init() {
  48. thingModelMessageMqConsumer.consume(Constants.DEVICE_PROPERTY_REPORT_TOPIC, this);
  49. }
  50. @Override
  51. public void handler(ThingModelMessage msg) {
  52. if (!(msg.getData() instanceof Map)) {
  53. return;
  54. }
  55. Map<String, Object> properties = (Map<String, Object>) msg.getData();
  56. String deviceId = msg.getDeviceId();
  57. DeviceInfo deviceInfo = deviceInfoData.findByDeviceId(deviceId);
  58. if (deviceInfo == null) {
  59. return;
  60. }
  61. //物模型校验,过滤非物模型属性
  62. ThingModel thingModel = thingModelData.findByProductKey(deviceInfo.getProductKey());
  63. if (thingModel == null) {
  64. return;
  65. }
  66. //物模型属性
  67. Map<String, ThingModel.DataType> thingModelProperties = thingModel.getModel().
  68. getProperties().stream().collect(Collectors.toMap(
  69. ThingModel.Property::getIdentifier, ThingModel.Property::getDataType));
  70. Map<String, Object> addProperties = new HashMap<>();
  71. //删除非属性字段
  72. properties.forEach((key,val)->{
  73. if (thingModelProperties.containsKey(key)) {
  74. addProperties.put(key,val);
  75. handleLocate(deviceInfo,val,thingModelProperties.get(key));
  76. }
  77. });
  78. //更新设备当前属性
  79. updateDeviceCurrentProperties(deviceId, addProperties);
  80. //保存属性记录
  81. try {
  82. devicePropertyData.addProperties(deviceId, addProperties, msg.getOccurred());
  83. } catch (Throwable e) {
  84. log.warn("save property data error", e);
  85. }
  86. }
  87. private void handleLocate(DeviceInfo deviceInfo,Object data,ThingModel.DataType dataType){
  88. if("position".equals(dataType.getType())){//如果是定位属性需要做一些处理
  89. Object specs = dataType.getSpecs();
  90. String locateType="";
  91. if (specs instanceof Map) {
  92. Object objlocateType = ((Map<?, ?>) specs).get("locateType");//定位方式
  93. if (objlocateType != null) {
  94. locateType = objlocateType.toString();
  95. }
  96. if(StringUtils.isNotBlank(locateType)){
  97. if("lonLat".equals(locateType)){//经纬度定位格式:经度,纬度
  98. String[] lonLats=data.toString().split(",");
  99. deviceInfo.getLocate().setLongitude(lonLats[0]);
  100. deviceInfo.getLocate().setLatitude(lonLats[1]);
  101. deviceInfoData.save(deviceInfo);
  102. }else if("basestation".equals(locateType)){//基站定位
  103. }else if("ipinfo".equals(locateType)){//ip定位
  104. }
  105. }
  106. }
  107. }
  108. }
  109. /**
  110. * 更新设备当前属性
  111. */
  112. private void updateDeviceCurrentProperties(String deviceId, Map<String, Object> properties) {
  113. try {
  114. log.info("save device property,deviceId:{},property:{}", deviceId, JsonUtils.toJsonString(properties));
  115. deviceInfoData.saveProperties(deviceId, properties);
  116. } catch (Throwable e) {
  117. log.error("save device current properties error", e);
  118. }
  119. }
  120. }