|
@@ -1,44 +1,66 @@
|
|
|
package vip.xiaonuo.coldchain.core.alarm.service.messagepush;
|
|
|
|
|
|
+import lombok.RequiredArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.scheduling.annotation.Async;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import vip.xiaonuo.coldchain.core.alarm.bean.NotificationChannel;
|
|
|
import vip.xiaonuo.coldchain.core.alarm.bean.SensorAlarm;
|
|
|
+import vip.xiaonuo.coldchain.core.alarm.bean.SensorAlarmUser;
|
|
|
import vip.xiaonuo.coldchain.core.alarm.mapper.SensorAlarmMapper;
|
|
|
+import vip.xiaonuo.coldchain.core.config.JfcloudColdChainConstants;
|
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
@Service
|
|
|
@Slf4j
|
|
|
+@RequiredArgsConstructor
|
|
|
public class SensorAlarmMessagePushService {
|
|
|
-
|
|
|
- // 用一个 Map 来存储不同的推送渠道服务
|
|
|
private final Map<String, MessagePushService> pushServices;
|
|
|
+ private final SensorAlarmMapper sensorAlarmMapper;
|
|
|
+ // 用来记录每个用户对某设备和告警类型的推送历史,key 为 (openid, deviceID, alarmType),value 为推送时间列表
|
|
|
+ private final Map<String, List<Long>> userDeviceAlarmPushHistory = new ConcurrentHashMap<>();
|
|
|
|
|
|
- @Autowired
|
|
|
- private SensorAlarmMapper sensorAlarmMapper; // 注入 MyBatis-Plus 的 Mapper
|
|
|
-
|
|
|
- @Autowired
|
|
|
- public SensorAlarmMessagePushService(Map<String, MessagePushService> pushServices) {
|
|
|
- this.pushServices = pushServices;
|
|
|
- }
|
|
|
|
|
|
/**
|
|
|
* 根据通知渠道推送消息,并更新通知状态
|
|
|
*
|
|
|
* @param alarm 告警信息
|
|
|
*/
|
|
|
+ @Async("coldChainAsyncTask")
|
|
|
public void pushAlarmMessage(SensorAlarm alarm) {
|
|
|
- // 遍历每个接收人和通知渠道,发送告警信息
|
|
|
- for (NotificationChannel channel : alarm.getNotificationChannel()) {
|
|
|
- MessagePushService pushService = pushServices.get(channel.name());
|
|
|
- log.info("推送消息渠道: {}", channel.getChannelName());
|
|
|
- if (pushService != null) {
|
|
|
- pushService.sendAlarmMessage(alarm);
|
|
|
- } else {
|
|
|
- log.info("不支持的通知渠道:" + channel);
|
|
|
+ String deviceID = alarm.getDeviceId(); // 获取设备ID
|
|
|
+ String alarmType = alarm.getAlarmType(); // 获取告警类型
|
|
|
+ List<String> openids = alarm.getAlarmUsers().stream().map(SensorAlarmUser::getOpenId).toList();
|
|
|
+ // 遍历每个 openid,针对每个用户进行推送次数限制和推送
|
|
|
+ for (String openid : openids) {
|
|
|
+ // 判断该用户对该设备和告警类型的推送是否超过限制
|
|
|
+ if (hasExceededPushLimit(openid, deviceID, alarmType)) {
|
|
|
+ log.info("用户 {} 对设备 {} 的告警类型 {} 在过去一个小时内推送次数超过限制,跳过推送", openid, deviceID, alarmType);
|
|
|
+ continue; // 跳过该用户
|
|
|
}
|
|
|
+ // 遍历每个接收人和通知渠道,发送告警信息
|
|
|
+ for (NotificationChannel channel : alarm.getNotificationChannel()) {
|
|
|
+ MessagePushService pushService = pushServices.get(channel.name());
|
|
|
+ log.info("推送消息渠道: {}", channel.getChannelName());
|
|
|
+
|
|
|
+ if (pushService != null) {
|
|
|
+ try {
|
|
|
+ // 调用推送服务
|
|
|
+ pushService.sendAlarmMessage(alarm);
|
|
|
+ log.info("告警信息成功推送至渠道: {}", channel.getChannelName());
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("推送消息失败,渠道: {},错误: {}", channel.getChannelName(), e.getMessage());
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ log.warn("不支持的通知渠道: {}", channel.getChannelName());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 记录推送时间
|
|
|
+ recordPushTime(openid, deviceID, alarmType);
|
|
|
}
|
|
|
// 设置已通知状态
|
|
|
alarm.setNotified(true);
|
|
@@ -46,13 +68,63 @@ public class SensorAlarmMessagePushService {
|
|
|
saveAlarmToDatabase(alarm);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 判断用户对某设备和某告警类型的推送是否超过了推送次数限制
|
|
|
+ *
|
|
|
+ * @param openid 用户的 openid
|
|
|
+ * @param deviceID 设备ID
|
|
|
+ * @param alarmType 告警类型
|
|
|
+ * @return 是否超过限制
|
|
|
+ */
|
|
|
+ private boolean hasExceededPushLimit(String openid, String deviceID, String alarmType) {
|
|
|
+ String key = generatePushHistoryKey(openid, deviceID, alarmType);
|
|
|
+ List<Long> pushTimes = userDeviceAlarmPushHistory.get(key);
|
|
|
+ if (pushTimes == null) {
|
|
|
+ return false; // 没有记录,说明该用户、设备、告警类型组合没有推送记录
|
|
|
+ }
|
|
|
+ // 清理超过时间窗口的记录
|
|
|
+ long currentTime = System.currentTimeMillis();
|
|
|
+ pushTimes.removeIf(time -> currentTime - time > JfcloudColdChainConstants.MESS_PUSH_TIME_WINDOW);
|
|
|
+ // 如果该用户对该设备和告警类型组合在当前时间窗口内已经超过最大推送次数,则不再推送
|
|
|
+ return pushTimes.size() >= JfcloudColdChainConstants.MESS_PUSH_MAX_PUSH_COUNT;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 生成唯一的推送历史记录的键值,基于用户 openid、设备ID和告警类型
|
|
|
+ *
|
|
|
+ * @param openid 用户的 openid
|
|
|
+ * @param deviceID 设备ID
|
|
|
+ * @param alarmType 告警类型
|
|
|
+ * @return 唯一的键值
|
|
|
+ */
|
|
|
+ private String generatePushHistoryKey(String openid, String deviceID, String alarmType) {
|
|
|
+ return openid + "_" + deviceID + "_" + alarmType;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 记录用户对某设备和某告警类型的推送时间
|
|
|
+ *
|
|
|
+ * @param openid 用户的 openid
|
|
|
+ * @param deviceID 设备ID
|
|
|
+ * @param alarmType 告警类型
|
|
|
+ */
|
|
|
+ private void recordPushTime(String openid, String deviceID, String alarmType) {
|
|
|
+ String key = generatePushHistoryKey(openid, deviceID, alarmType);
|
|
|
+ List<Long> pushTimes = userDeviceAlarmPushHistory.computeIfAbsent(key, k -> new ArrayList<>());
|
|
|
+ pushTimes.add(System.currentTimeMillis());
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 保存告警信息到数据库
|
|
|
*
|
|
|
* @param alarm 告警信息
|
|
|
*/
|
|
|
private void saveAlarmToDatabase(SensorAlarm alarm) {
|
|
|
- // 调用 MyBatis-Plus 的 saveOrUpdate 方法保存或更新告警数据
|
|
|
- sensorAlarmMapper.insert(alarm); // 假设 mapper 有这个方法
|
|
|
+ try {
|
|
|
+ sensorAlarmMapper.insert(alarm);
|
|
|
+ log.info("告警信息已保存到数据库: {}", alarm.getId());
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("保存告警信息到数据库失败: {}", e.getMessage());
|
|
|
+ }
|
|
|
}
|
|
|
}
|