|
@@ -3,149 +3,113 @@ package vip.xiaonuo.coldchain.core.event;
|
|
|
import com.github.jfcloud.influxdb.service.JfcloudInfluxDBService;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.springframework.context.ApplicationEventPublisher;
|
|
|
import org.springframework.context.event.EventListener;
|
|
|
+import org.springframework.scheduling.annotation.Async;
|
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
import vip.xiaonuo.coldchain.core.bean.influxdb.SensorData;
|
|
|
-import vip.xiaonuo.coldchain.core.event.alarm.SensorThreshold;
|
|
|
-import vip.xiaonuo.coldchain.core.event.alarm.ThresholdService;
|
|
|
+import vip.xiaonuo.coldchain.core.config.JfcloudColdChainConstants;
|
|
|
+import vip.xiaonuo.coldchain.core.event.alarm.service.SensorAlarmChecker;
|
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
import java.util.Objects;
|
|
|
import java.util.Queue;
|
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
-import java.util.stream.Collectors;
|
|
|
|
|
|
-/**
|
|
|
- * 事件监听器:处理 SensorData 事件,缓存数据并进行定时写入数据库,同时检查是否需要报警
|
|
|
- */
|
|
|
@Slf4j
|
|
|
@RequiredArgsConstructor
|
|
|
@Component
|
|
|
public class SensorDataEventListener2 {
|
|
|
private final JfcloudInfluxDBService jfcloudInfluxDBService;
|
|
|
- private final ApplicationEventPublisher eventPublisher;
|
|
|
- private final ThresholdService thresholdService;
|
|
|
- // 使用线程安全的队列来缓存传感器数据
|
|
|
- private final Queue<SensorData> sensorDataQueue = new ConcurrentLinkedQueue<>();
|
|
|
- // 队列最大容量和写入数据库的时间间隔
|
|
|
- private static final int MAX_QUEUE_SIZE = 10;
|
|
|
- private static final long CHECK_INTERVAL_MS = 10000L; // 每10秒检查一次
|
|
|
+ private final SensorAlarmChecker sensorAlarmChecker;
|
|
|
+ // 使用双缓冲机制:一个队列缓存数据,另一个队列进行批量写入
|
|
|
+ private final Queue<SensorData> bufferQueue = new ConcurrentLinkedQueue<>();
|
|
|
+ private final Queue<SensorData> writeQueue = new ConcurrentLinkedQueue<>();
|
|
|
|
|
|
- /**
|
|
|
- * 监听 SensorDataEvent 事件,收到传感器数据后进行缓存和处理
|
|
|
- *
|
|
|
- * @param event 传感器数据事件
|
|
|
- */
|
|
|
+ // 事件监听,处理传感器数据
|
|
|
@EventListener
|
|
|
public void handleSensorDataEvent(SensorDataEvent event) {
|
|
|
SensorData sensorData = event.getSensorData();
|
|
|
- // 将数据放入队列并检查是否需要报警
|
|
|
- addSensorData(sensorData);
|
|
|
- // 获取传感器的阈值范围
|
|
|
- SensorThreshold sensorThreshold = thresholdService.getThresholdBySensorId(sensorData.getDeviceId(), sensorData.getRoads());
|
|
|
- // 新探头 不做校验
|
|
|
- if (Objects.nonNull(sensorThreshold)) {
|
|
|
- // 在数据放入队列后进行报警检查
|
|
|
- checkForAlarm(sensorData, sensorThreshold);
|
|
|
+ if (Objects.nonNull(sensorData)) {
|
|
|
+ // 将数据添加到缓存队列
|
|
|
+ addSensorDataToBufferQueue(sensorData);
|
|
|
+ // 获取传感器的阈值范围并检查报警
|
|
|
+ sensorAlarmChecker.checkAlarm(sensorData);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 将新的 SensorData 加入队列,并根据队列大小判断是否需要写入数据库
|
|
|
- *
|
|
|
- * @param sensorData 传感器数据
|
|
|
- */
|
|
|
- public void addSensorData(SensorData sensorData) {
|
|
|
- // 将数据放入队列
|
|
|
- sensorDataQueue.offer(sensorData);
|
|
|
- // 如果队列达到最大容量,批量写入数据库
|
|
|
- if (sensorDataQueue.size() >= MAX_QUEUE_SIZE) {
|
|
|
- writeDataToDatabase();
|
|
|
+ // 将数据添加到缓存队列中
|
|
|
+ public void addSensorDataToBufferQueue(SensorData sensorData) {
|
|
|
+ bufferQueue.offer(sensorData);
|
|
|
+ // 如果缓存队列的数据超过设定阈值,转移数据到写入队列
|
|
|
+ if (bufferQueue.size() >= JfcloudColdChainConstants.MAX_QUEUE_SIZE) {
|
|
|
+ transferDataToWriteQueue();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 定时检查队列并写入数据库
|
|
|
- * 每隔 CHECK_INTERVAL_MS 时间检查一次
|
|
|
- */
|
|
|
- @Scheduled(fixedRate = CHECK_INTERVAL_MS)
|
|
|
+ // 定期检查并批量写入数据
|
|
|
+ @Scheduled(fixedRate = JfcloudColdChainConstants.CHECK_INTERVAL_MS)
|
|
|
public void checkAndWriteData() {
|
|
|
- // 只有队列达到一定大小才进行写入
|
|
|
- if (sensorDataQueue.size() >= MAX_QUEUE_SIZE) {
|
|
|
- writeDataToDatabase();
|
|
|
+ // 如果缓存队列达到阈值,则将数据转移到写入队列并异步写入
|
|
|
+ if (bufferQueue.size() >= JfcloudColdChainConstants.MAX_QUEUE_SIZE) {
|
|
|
+ transferDataToWriteQueue();
|
|
|
+ // 异步写入数据到数据库
|
|
|
+ writeDataToDatabaseAsync();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 将队列中的数据批量写入数据库,并清空队列
|
|
|
- */
|
|
|
- private void writeDataToDatabase() {
|
|
|
- if (!sensorDataQueue.isEmpty()) {
|
|
|
- try {
|
|
|
- log.info("开始批量写入数据到 InfluxDB,队列大小: {}", sensorDataQueue.size());
|
|
|
-
|
|
|
- // 将队列中的数据转为 List
|
|
|
- List<SensorData> dataList = sensorDataQueue.stream()
|
|
|
- .collect(Collectors.toList());
|
|
|
+ // 将缓存队列的数据转移到写入队列
|
|
|
+ private void transferDataToWriteQueue() {
|
|
|
+ if (!bufferQueue.isEmpty()) {
|
|
|
+ writeQueue.addAll(bufferQueue);
|
|
|
+ bufferQueue.clear(); // 清空缓存队列
|
|
|
+ log.debug("数据已从缓存队列转移到写入队列,当前写入队列大小: {}", writeQueue.size());
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- // 批量写入数据库
|
|
|
+ // 异步写入数据到数据库
|
|
|
+ @Async("coldChainAsyncTask")
|
|
|
+ public void writeDataToDatabaseAsync() {
|
|
|
+ if (!writeQueue.isEmpty()) {
|
|
|
+ List<SensorData> dataList = new ArrayList<>(writeQueue);
|
|
|
+ try {
|
|
|
+ log.info("开始异步批量写入数据到 InfluxDB,数据量: {}", dataList.size());
|
|
|
jfcloudInfluxDBService.writePojo(dataList);
|
|
|
-
|
|
|
- // 写入完成后清空队列
|
|
|
- sensorDataQueue.clear();
|
|
|
-
|
|
|
- log.info("成功批量写入数据到 InfluxDB,队列已清空");
|
|
|
-
|
|
|
+ writeQueue.clear(); // 写入完成后清空写入队列
|
|
|
+ log.info("数据成功写入 InfluxDB,队列已清空");
|
|
|
} catch (Exception e) {
|
|
|
log.error("写入数据到 InfluxDB 时出错: {}", e.getMessage(), e);
|
|
|
+ // 如果写入失败,进行重试
|
|
|
+ retryWriteDataToDatabase(dataList);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 检查传感器数据是否满足报警条件
|
|
|
- * 触发报警事件
|
|
|
- *
|
|
|
- * @param sensorData 传感器数据
|
|
|
- * @param threshold 传感器的阈值范围
|
|
|
+ * 如果写入失败,进行重试
|
|
|
*/
|
|
|
- private void checkForAlarm(SensorData sensorData, SensorThreshold threshold) {
|
|
|
- // 检查温度是否超标
|
|
|
- if (sensorData.getTemperature() != null) {
|
|
|
- if (sensorData.getTemperature() < threshold.getTemperatureMin() || sensorData.getTemperature() > threshold.getTemperatureMax()) {
|
|
|
- publishAlarm("温度超标", sensorData.getTemperature(), "°C");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // 检查湿度是否超标
|
|
|
- if (sensorData.getHumidity() != null) {
|
|
|
- if (sensorData.getHumidity() < threshold.getHumidityMin() || sensorData.getHumidity() > threshold.getHumidityMax()) {
|
|
|
- publishAlarm("湿度超标", sensorData.getHumidity(), "%");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // 检查二氧化碳是否超标
|
|
|
- if (sensorData.getCo2() != null) {
|
|
|
- if (sensorData.getCo2() < threshold.getCo2Min() || sensorData.getCo2() > threshold.getCo2Max()) {
|
|
|
- publishAlarm("二氧化碳超标", sensorData.getCo2(), "ppm");
|
|
|
+ private void retryWriteDataToDatabase(List<SensorData> dataList) {
|
|
|
+ int retries = JfcloudColdChainConstants.MAX_RETRIES;
|
|
|
+ int delay = JfcloudColdChainConstants.RETRY_DELAY_MS;
|
|
|
+ while (retries > 0) {
|
|
|
+ try {
|
|
|
+ log.info("正在进行重试,剩余次数: {}", retries);
|
|
|
+ jfcloudInfluxDBService.writePojo(dataList);
|
|
|
+ log.info("数据成功写入 InfluxDB,重试成功");
|
|
|
+ return;
|
|
|
+ } catch (Exception e) {
|
|
|
+ retries--;
|
|
|
+ log.error("重试失败,剩余重试次数: {}, 错误信息: {}", retries, e.getMessage());
|
|
|
+ if (retries == 0) {
|
|
|
+ log.error("重试已达最大次数,放弃写入操作");
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ Thread.sleep(delay); // 等待一段时间后重试
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * 发布报警事件
|
|
|
- *
|
|
|
- * @param alarmType 报警类型
|
|
|
- * @param value 超标的数值
|
|
|
- * @param unit 单位
|
|
|
- */
|
|
|
- private void publishAlarm(String alarmType, Float value, String unit) {
|
|
|
- String alarmMessage = String.format("%s: %.2f %s", alarmType, value, unit);
|
|
|
- log.warn("数据异常,发布报警: {}", alarmMessage);
|
|
|
-
|
|
|
- // 发布报警事件(假设 AlarmEvent 已经定义)
|
|
|
- eventPublisher.publishEvent(new SensorAlarmEvent(this/*, alarmMessage*/));
|
|
|
- }
|
|
|
}
|