|
@@ -9,8 +9,10 @@ import org.springframework.scheduling.annotation.Scheduled;
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
import vip.xiaonuo.coldchain.core.bean.influxdb.SensorData;
|
|
import vip.xiaonuo.coldchain.core.bean.influxdb.SensorData;
|
|
import vip.xiaonuo.coldchain.core.event.alarm.SensorThreshold;
|
|
import vip.xiaonuo.coldchain.core.event.alarm.SensorThreshold;
|
|
|
|
+import vip.xiaonuo.coldchain.core.event.alarm.ThresholdService;
|
|
|
|
|
|
import java.util.LinkedList;
|
|
import java.util.LinkedList;
|
|
|
|
+import java.util.List;
|
|
import java.util.Queue;
|
|
import java.util.Queue;
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -22,10 +24,11 @@ import java.util.Queue;
|
|
public class SensorDataEventListener2 {
|
|
public class SensorDataEventListener2 {
|
|
private final JfcloudInfluxDBService jfcloudInfluxDBService;
|
|
private final JfcloudInfluxDBService jfcloudInfluxDBService;
|
|
private final ApplicationEventPublisher eventPublisher;
|
|
private final ApplicationEventPublisher eventPublisher;
|
|
|
|
+ private final ThresholdService thresholdService;
|
|
// 使用队列缓存 SensorData
|
|
// 使用队列缓存 SensorData
|
|
private final Queue<SensorData> sensorDataQueue = new LinkedList<>();
|
|
private final Queue<SensorData> sensorDataQueue = new LinkedList<>();
|
|
// 队列最大容量和写入数据库的时间间隔
|
|
// 队列最大容量和写入数据库的时间间隔
|
|
- private static final int MAX_QUEUE_SIZE = 100;
|
|
|
|
|
|
+ private static final int MAX_QUEUE_SIZE = 10;
|
|
// 每10秒检查一次
|
|
// 每10秒检查一次
|
|
private static final long CHECK_INTERVAL_MS = 10000L;
|
|
private static final long CHECK_INTERVAL_MS = 10000L;
|
|
|
|
|
|
@@ -37,12 +40,12 @@ public class SensorDataEventListener2 {
|
|
@EventListener
|
|
@EventListener
|
|
public void handleSensorDataEvent(SensorDataEvent event) {
|
|
public void handleSensorDataEvent(SensorDataEvent event) {
|
|
SensorData sensorData = event.getSensorData();
|
|
SensorData sensorData = event.getSensorData();
|
|
-
|
|
|
|
// 将数据放入队列
|
|
// 将数据放入队列
|
|
addSensorData(sensorData);
|
|
addSensorData(sensorData);
|
|
-
|
|
|
|
|
|
+ // 获取传感器的阈值范围
|
|
|
|
+ SensorThreshold sensorThreshold = thresholdService.getThresholdBySensorId(sensorData.getDeviceId(), sensorData.getRoads());
|
|
// 在数据放入队列后进行报警检查
|
|
// 在数据放入队列后进行报警检查
|
|
- checkForAlarm(sensorData);
|
|
|
|
|
|
+ checkForAlarm(sensorData,sensorThreshold);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -82,10 +85,13 @@ public class SensorDataEventListener2 {
|
|
if (!sensorDataQueue.isEmpty()) {
|
|
if (!sensorDataQueue.isEmpty()) {
|
|
try {
|
|
try {
|
|
log.info("开始批量写入数据到 InfluxDB,队列大小: {}", sensorDataQueue.size());
|
|
log.info("开始批量写入数据到 InfluxDB,队列大小: {}", sensorDataQueue.size());
|
|
- // 批量写入数据
|
|
|
|
- for (SensorData sensorData : sensorDataQueue) {
|
|
|
|
- jfcloudInfluxDBService.writePojo(sensorData);
|
|
|
|
- }
|
|
|
|
|
|
+// // 批量写入数据
|
|
|
|
+// for (SensorData sensorData : sensorDataQueue) {
|
|
|
|
+// jfcloudInfluxDBService.writePojo(sensorData);
|
|
|
|
+// }
|
|
|
|
+ List<SensorData> lists = sensorDataQueue.stream()
|
|
|
|
+ .toList();
|
|
|
|
+ jfcloudInfluxDBService.writePojo(lists);
|
|
// 写入完成后清空队列
|
|
// 写入完成后清空队列
|
|
sensorDataQueue.clear();
|
|
sensorDataQueue.clear();
|
|
log.info("成功批量写入数据到 InfluxDB,队列已清空");
|
|
log.info("成功批量写入数据到 InfluxDB,队列已清空");
|