|
@@ -11,9 +11,11 @@ import vip.xiaonuo.coldchain.core.bean.influxdb.SensorData;
|
|
|
import vip.xiaonuo.coldchain.core.event.alarm.SensorThreshold;
|
|
|
import vip.xiaonuo.coldchain.core.event.alarm.ThresholdService;
|
|
|
|
|
|
-import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
+import java.util.Objects;
|
|
|
import java.util.Queue;
|
|
|
+import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
* 事件监听器:处理 SensorData 事件,缓存数据并进行定时写入数据库,同时检查是否需要报警
|
|
@@ -25,12 +27,11 @@ public class SensorDataEventListener2 {
|
|
|
private final JfcloudInfluxDBService jfcloudInfluxDBService;
|
|
|
private final ApplicationEventPublisher eventPublisher;
|
|
|
private final ThresholdService thresholdService;
|
|
|
- // 使用队列缓存 SensorData
|
|
|
- private final Queue<SensorData> sensorDataQueue = new LinkedList<>();
|
|
|
+ // 使用线程安全的队列来缓存传感器数据
|
|
|
+ private final Queue<SensorData> sensorDataQueue = new ConcurrentLinkedQueue<>();
|
|
|
// 队列最大容量和写入数据库的时间间隔
|
|
|
private static final int MAX_QUEUE_SIZE = 10;
|
|
|
- // 每10秒检查一次
|
|
|
- private static final long CHECK_INTERVAL_MS = 10000L;
|
|
|
+ private static final long CHECK_INTERVAL_MS = 10000L; // 每10秒检查一次
|
|
|
|
|
|
/**
|
|
|
* 监听 SensorDataEvent 事件,收到传感器数据后进行缓存和处理
|
|
@@ -40,12 +41,15 @@ public class SensorDataEventListener2 {
|
|
|
@EventListener
|
|
|
public void handleSensorDataEvent(SensorDataEvent event) {
|
|
|
SensorData sensorData = event.getSensorData();
|
|
|
- // 将数据放入队列
|
|
|
+ // 将数据放入队列并检查是否需要报警
|
|
|
addSensorData(sensorData);
|
|
|
// 获取传感器的阈值范围
|
|
|
SensorThreshold sensorThreshold = thresholdService.getThresholdBySensorId(sensorData.getDeviceId(), sensorData.getRoads());
|
|
|
- // 在数据放入队列后进行报警检查
|
|
|
- checkForAlarm(sensorData,sensorThreshold);
|
|
|
+ // 新探头 不做校验
|
|
|
+ if (Objects.nonNull(sensorThreshold)) {
|
|
|
+ // 在数据放入队列后进行报警检查
|
|
|
+ checkForAlarm(sensorData, sensorThreshold);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -54,10 +58,8 @@ public class SensorDataEventListener2 {
|
|
|
* @param sensorData 传感器数据
|
|
|
*/
|
|
|
public void addSensorData(SensorData sensorData) {
|
|
|
- synchronized (sensorDataQueue) {
|
|
|
- sensorDataQueue.offer(sensorData); // 将数据放入队列
|
|
|
- }
|
|
|
-
|
|
|
+ // 将数据放入队列
|
|
|
+ sensorDataQueue.offer(sensorData);
|
|
|
// 如果队列达到最大容量,批量写入数据库
|
|
|
if (sensorDataQueue.size() >= MAX_QUEUE_SIZE) {
|
|
|
writeDataToDatabase();
|
|
@@ -70,10 +72,9 @@ public class SensorDataEventListener2 {
|
|
|
*/
|
|
|
@Scheduled(fixedRate = CHECK_INTERVAL_MS)
|
|
|
public void checkAndWriteData() {
|
|
|
- synchronized (sensorDataQueue) {
|
|
|
- if (!sensorDataQueue.isEmpty()) {
|
|
|
- writeDataToDatabase();
|
|
|
- }
|
|
|
+ // 只有队列达到一定大小才进行写入
|
|
|
+ if (sensorDataQueue.size() >= MAX_QUEUE_SIZE) {
|
|
|
+ writeDataToDatabase();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -81,23 +82,24 @@ public class SensorDataEventListener2 {
|
|
|
* 将队列中的数据批量写入数据库,并清空队列
|
|
|
*/
|
|
|
private void writeDataToDatabase() {
|
|
|
- synchronized (sensorDataQueue) {
|
|
|
- if (!sensorDataQueue.isEmpty()) {
|
|
|
- try {
|
|
|
- log.info("开始批量写入数据到 InfluxDB,队列大小: {}", sensorDataQueue.size());
|
|
|
-// // 批量写入数据
|
|
|
-// for (SensorData sensorData : sensorDataQueue) {
|
|
|
-// jfcloudInfluxDBService.writePojo(sensorData);
|
|
|
-// }
|
|
|
- List<SensorData> lists = sensorDataQueue.stream()
|
|
|
- .toList();
|
|
|
- jfcloudInfluxDBService.writePojo(lists);
|
|
|
- // 写入完成后清空队列
|
|
|
- sensorDataQueue.clear();
|
|
|
- log.info("成功批量写入数据到 InfluxDB,队列已清空");
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("写入数据到 InfluxDB 时出错: {}", e.getMessage(), e);
|
|
|
- }
|
|
|
+ if (!sensorDataQueue.isEmpty()) {
|
|
|
+ try {
|
|
|
+ log.info("开始批量写入数据到 InfluxDB,队列大小: {}", sensorDataQueue.size());
|
|
|
+
|
|
|
+ // 将队列中的数据转为 List
|
|
|
+ List<SensorData> dataList = sensorDataQueue.stream()
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ // 批量写入数据库
|
|
|
+ jfcloudInfluxDBService.writePojo(dataList);
|
|
|
+
|
|
|
+ // 写入完成后清空队列
|
|
|
+ sensorDataQueue.clear();
|
|
|
+
|
|
|
+ log.info("成功批量写入数据到 InfluxDB,队列已清空");
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("写入数据到 InfluxDB 时出错: {}", e.getMessage(), e);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -107,6 +109,7 @@ public class SensorDataEventListener2 {
|
|
|
* 触发报警事件
|
|
|
*
|
|
|
* @param sensorData 传感器数据
|
|
|
+ * @param threshold 传感器的阈值范围
|
|
|
*/
|
|
|
private void checkForAlarm(SensorData sensorData, SensorThreshold threshold) {
|
|
|
// 检查温度是否超标
|
|
@@ -131,9 +134,17 @@ public class SensorDataEventListener2 {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 发布报警事件
|
|
|
+ *
|
|
|
+ * @param alarmType 报警类型
|
|
|
+ * @param value 超标的数值
|
|
|
+ * @param unit 单位
|
|
|
+ */
|
|
|
private void publishAlarm(String alarmType, Float value, String unit) {
|
|
|
String alarmMessage = String.format("%s: %.2f %s", alarmType, value, unit);
|
|
|
log.warn("数据异常,发布报警: {}", alarmMessage);
|
|
|
+
|
|
|
// 发布报警事件(假设 AlarmEvent 已经定义)
|
|
|
eventPublisher.publishEvent(new SensorAlarmEvent(this/*, alarmMessage*/));
|
|
|
}
|