|
@@ -2,15 +2,14 @@ package cc.iotkit.comps.service;
|
|
|
|
|
|
import cc.iotkit.common.Constants;
|
|
|
import cc.iotkit.common.utils.JsonUtil;
|
|
|
-import cc.iotkit.comps.config.ServerConfig;
|
|
|
import cc.iotkit.dao.*;
|
|
|
import cc.iotkit.model.device.DeviceInfo;
|
|
|
import cc.iotkit.model.device.message.DeviceProperty;
|
|
|
import cc.iotkit.model.device.message.DeviceReport;
|
|
|
import cc.iotkit.model.device.message.ThingModelMessage;
|
|
|
-import lombok.SneakyThrows;
|
|
|
+import cc.iotkit.mq.ConsumerHandler;
|
|
|
+import cc.iotkit.mq.MqConsumer;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.apache.pulsar.client.api.*;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.context.annotation.Lazy;
|
|
|
import org.springframework.stereotype.Service;
|
|
@@ -21,10 +20,7 @@ import java.util.UUID;
|
|
|
|
|
|
@Slf4j
|
|
|
@Service
|
|
|
-public class DeviceMessageConsumer implements MessageListener<ThingModelMessage> {
|
|
|
-
|
|
|
- @Autowired
|
|
|
- private ServerConfig serverConfig;
|
|
|
+public class DeviceMessageConsumer implements ConsumerHandler<ThingModelMessage> {
|
|
|
@Lazy
|
|
|
@Autowired
|
|
|
private ThingModelMessageRepository messageRepository;
|
|
@@ -38,37 +34,29 @@ public class DeviceMessageConsumer implements MessageListener<ThingModelMessage>
|
|
|
private DeviceDao deviceDao;
|
|
|
@Autowired
|
|
|
private DeviceCache deviceCache;
|
|
|
+ @Autowired
|
|
|
+ private MqConsumer<ThingModelMessage> thingModelMessageConsumer;
|
|
|
|
|
|
@PostConstruct
|
|
|
- public void init() throws PulsarClientException {
|
|
|
- PulsarClient client = PulsarClient.builder()
|
|
|
- .serviceUrl(this.serverConfig.getPulsarBrokerUrl())
|
|
|
- .build();
|
|
|
-
|
|
|
- client.newConsumer(Schema.JSON(ThingModelMessage.class))
|
|
|
- .topic("persistent://iotkit/default/" + Constants.THING_MODEL_MESSAGE_TOPIC)
|
|
|
- .subscriptionName("thing-model-message")
|
|
|
- .consumerName("thing-model-message-consumer")
|
|
|
- .messageListener(this).subscribe();
|
|
|
+ public void init() {
|
|
|
+ thingModelMessageConsumer.consume(Constants.THING_MODEL_MESSAGE_TOPIC, this);
|
|
|
}
|
|
|
|
|
|
- @SneakyThrows
|
|
|
@Override
|
|
|
- public void received(Consumer<ThingModelMessage> consumer, Message<ThingModelMessage> msg) {
|
|
|
+ public void handler(ThingModelMessage msg) {
|
|
|
try {
|
|
|
- ThingModelMessage modelMessage = msg.getValue();
|
|
|
- String deviceId = modelMessage.getDeviceId();
|
|
|
- log.info("save message to es:{}", JsonUtil.toJsonString(modelMessage));
|
|
|
+ String deviceId = msg.getDeviceId();
|
|
|
+ log.info("save message to es:{}", JsonUtil.toJsonString(msg));
|
|
|
//属性入库
|
|
|
- if (ThingModelMessage.TYPE_PROPERTY.equals(modelMessage.getType())
|
|
|
- && "report".equals(modelMessage.getIdentifier())) {
|
|
|
+ if (ThingModelMessage.TYPE_PROPERTY.equals(msg.getType())
|
|
|
+ && "report".equals(msg.getIdentifier())) {
|
|
|
log.info("update device property,deviceId:{},property:{}",
|
|
|
- deviceId, JsonUtil.toJsonString(modelMessage.getData()));
|
|
|
- deviceDao.updateProperties(deviceId, (Map<String, Object>) modelMessage.getData());
|
|
|
+ deviceId, JsonUtil.toJsonString(msg.getData()));
|
|
|
+ deviceDao.updateProperties(deviceId, (Map<String, Object>) msg.getData());
|
|
|
|
|
|
//设备属性历史数据存储
|
|
|
- if (modelMessage.getData() instanceof Map) {
|
|
|
- Map map = (Map) modelMessage.getData();
|
|
|
+ if (msg.getData() instanceof Map) {
|
|
|
+ Map map = (Map) msg.getData();
|
|
|
int index = 0;
|
|
|
for (Object key : map.keySet()) {
|
|
|
index++;
|
|
@@ -76,11 +64,11 @@ public class DeviceMessageConsumer implements MessageListener<ThingModelMessage>
|
|
|
propertyRepository.save(
|
|
|
new DeviceProperty(
|
|
|
//防止重复id被覆盖
|
|
|
- modelMessage.getMid() + "_" + index,
|
|
|
+ msg.getMid() + "_" + index,
|
|
|
deviceId,
|
|
|
key.toString(),
|
|
|
map.get(key),
|
|
|
- modelMessage.getOccurred()
|
|
|
+ msg.getOccurred()
|
|
|
)
|
|
|
);
|
|
|
} catch (Throwable e) {
|
|
@@ -93,9 +81,9 @@ public class DeviceMessageConsumer implements MessageListener<ThingModelMessage>
|
|
|
try {
|
|
|
//todo 存在性能问题,量大可再拆分处理
|
|
|
//设备消息日志入库
|
|
|
- messageRepository.save(modelMessage);
|
|
|
+ messageRepository.save(msg);
|
|
|
//设备上报日志入库
|
|
|
- deviceReportRepository.save(getDeviceReport(modelMessage));
|
|
|
+ deviceReportRepository.save(getDeviceReport(msg));
|
|
|
} catch (Throwable e) {
|
|
|
log.warn("save device message to es error", e);
|
|
|
}
|
|
@@ -103,7 +91,6 @@ public class DeviceMessageConsumer implements MessageListener<ThingModelMessage>
|
|
|
//不能重复消费
|
|
|
log.error("device message consumer error", e);
|
|
|
}
|
|
|
- consumer.acknowledge(msg);
|
|
|
}
|
|
|
|
|
|
private DeviceReport getDeviceReport(ThingModelMessage message) {
|
|
@@ -120,10 +107,4 @@ public class DeviceMessageConsumer implements MessageListener<ThingModelMessage>
|
|
|
.time(message.getTime())
|
|
|
.build();
|
|
|
}
|
|
|
-
|
|
|
- @Override
|
|
|
- public void reachedEndOfTopic(Consumer<ThingModelMessage> consumer) {
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
}
|