|
@@ -0,0 +1,134 @@
|
|
|
+package vip.xiaonuo.coldchain.core.event;
|
|
|
+
|
|
|
+import com.github.jfcloud.influxdb.service.JfcloudInfluxDBService;
|
|
|
+import lombok.RequiredArgsConstructor;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.context.ApplicationEventPublisher;
|
|
|
+import org.springframework.context.event.EventListener;
|
|
|
+import org.springframework.scheduling.annotation.Scheduled;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+import vip.xiaonuo.coldchain.core.bean.influxdb.SensorData;
|
|
|
+import vip.xiaonuo.coldchain.core.event.alarm.SensorThreshold;
|
|
|
+
|
|
|
+import java.util.LinkedList;
|
|
|
+import java.util.Queue;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 事件监听器:处理 SensorData 事件,缓存数据并进行定时写入数据库,同时检查是否需要报警
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@RequiredArgsConstructor
|
|
|
+@Component
|
|
|
+public class SensorDataEventListener2 {
|
|
|
+ private final JfcloudInfluxDBService jfcloudInfluxDBService;
|
|
|
+ private final ApplicationEventPublisher eventPublisher;
|
|
|
+ // 使用队列缓存 SensorData
|
|
|
+ private final Queue<SensorData> sensorDataQueue = new LinkedList<>();
|
|
|
+ // 队列最大容量和写入数据库的时间间隔
|
|
|
+ private static final int MAX_QUEUE_SIZE = 100;
|
|
|
+ // 每10秒检查一次
|
|
|
+ private static final long CHECK_INTERVAL_MS = 10000L;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 监听 SensorDataEvent 事件,收到传感器数据后进行缓存和处理
|
|
|
+ *
|
|
|
+ * @param event 传感器数据事件
|
|
|
+ */
|
|
|
+ @EventListener
|
|
|
+ public void handleSensorDataEvent(SensorDataEvent event) {
|
|
|
+ SensorData sensorData = event.getSensorData();
|
|
|
+
|
|
|
+ // 将数据放入队列
|
|
|
+ addSensorData(sensorData);
|
|
|
+
|
|
|
+ // 在数据放入队列后进行报警检查
|
|
|
+ checkForAlarm(sensorData);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 将新的 SensorData 加入队列,并根据队列大小判断是否需要写入数据库
|
|
|
+ *
|
|
|
+ * @param sensorData 传感器数据
|
|
|
+ */
|
|
|
+ public void addSensorData(SensorData sensorData) {
|
|
|
+ synchronized (sensorDataQueue) {
|
|
|
+ sensorDataQueue.offer(sensorData); // 将数据放入队列
|
|
|
+ }
|
|
|
+
|
|
|
+ // 如果队列达到最大容量,批量写入数据库
|
|
|
+ if (sensorDataQueue.size() >= MAX_QUEUE_SIZE) {
|
|
|
+ writeDataToDatabase();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 定时检查队列并写入数据库
|
|
|
+ * 每隔 CHECK_INTERVAL_MS 时间检查一次
|
|
|
+ */
|
|
|
+ @Scheduled(fixedRate = CHECK_INTERVAL_MS)
|
|
|
+ public void checkAndWriteData() {
|
|
|
+ synchronized (sensorDataQueue) {
|
|
|
+ if (!sensorDataQueue.isEmpty()) {
|
|
|
+ writeDataToDatabase();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 将队列中的数据批量写入数据库,并清空队列
|
|
|
+ */
|
|
|
+ private void writeDataToDatabase() {
|
|
|
+ synchronized (sensorDataQueue) {
|
|
|
+ if (!sensorDataQueue.isEmpty()) {
|
|
|
+ try {
|
|
|
+ log.info("开始批量写入数据到 InfluxDB,队列大小: {}", sensorDataQueue.size());
|
|
|
+ // 批量写入数据
|
|
|
+ for (SensorData sensorData : sensorDataQueue) {
|
|
|
+ jfcloudInfluxDBService.writePojo(sensorData);
|
|
|
+ }
|
|
|
+ // 写入完成后清空队列
|
|
|
+ sensorDataQueue.clear();
|
|
|
+ log.info("成功批量写入数据到 InfluxDB,队列已清空");
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("写入数据到 InfluxDB 时出错: {}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 检查传感器数据是否满足报警条件
|
|
|
+ * 触发报警事件
|
|
|
+ *
|
|
|
+ * @param sensorData 传感器数据
|
|
|
+ */
|
|
|
+ private void checkForAlarm(SensorData sensorData, SensorThreshold threshold) {
|
|
|
+ // 检查温度是否超标
|
|
|
+ if (sensorData.getTemperature() != null) {
|
|
|
+ if (sensorData.getTemperature() < threshold.getTemperatureMin() || sensorData.getTemperature() > threshold.getTemperatureMax()) {
|
|
|
+ publishAlarm("温度超标", sensorData.getTemperature(), "°C");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 检查湿度是否超标
|
|
|
+ if (sensorData.getHumidity() != null) {
|
|
|
+ if (sensorData.getHumidity() < threshold.getHumidityMin() || sensorData.getHumidity() > threshold.getHumidityMax()) {
|
|
|
+ publishAlarm("湿度超标", sensorData.getHumidity(), "%");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 检查二氧化碳是否超标
|
|
|
+ if (sensorData.getCo2() != null) {
|
|
|
+ if (sensorData.getCo2() < threshold.getCo2Min() || sensorData.getCo2() > threshold.getCo2Max()) {
|
|
|
+ publishAlarm("二氧化碳超标", sensorData.getCo2(), "ppm");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void publishAlarm(String alarmType, Float value, String unit) {
|
|
|
+ String alarmMessage = String.format("%s: %.2f %s", alarmType, value, unit);
|
|
|
+ log.warn("数据异常,发布报警: {}", alarmMessage);
|
|
|
+ // 发布报警事件(假设 AlarmEvent 已经定义)
|
|
|
+ eventPublisher.publishEvent(new SensorAlarmEvent(this/*, alarmMessage*/));
|
|
|
+ }
|
|
|
+}
|