|
@@ -16,6 +16,7 @@ import cc.iotkit.model.stats.TimeData;
|
|
|
import cc.iotkit.temporal.IThingModelMessageData;
|
|
|
import cc.iotkit.temporal.es.dao.ThingModelMessageRepository;
|
|
|
import cc.iotkit.temporal.es.document.DocThingModelMessage;
|
|
|
+import cn.hutool.core.util.ObjectUtil;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.elasticsearch.index.query.BoolQueryBuilder;
|
|
|
import org.elasticsearch.index.query.QueryBuilders;
|
|
@@ -28,6 +29,7 @@ import org.springframework.data.domain.PageRequest;
|
|
|
import org.springframework.data.domain.Sort;
|
|
|
import org.springframework.data.elasticsearch.core.ElasticsearchAggregations;
|
|
|
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
|
|
|
+import org.springframework.data.elasticsearch.core.SearchHit;
|
|
|
import org.springframework.data.elasticsearch.core.SearchHits;
|
|
|
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
|
|
|
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
|
|
@@ -100,6 +102,92 @@ public class ThingModelMessageDataImpl implements IThingModelMessageData {
|
|
|
return data;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public List<TimeData> getDeviceUpMessageStatsWithUid(String uid, Long start, Long end) {
|
|
|
+ BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
|
|
|
+ if (ObjectUtil.isNotEmpty(start) && ObjectUtil.isNotEmpty(end)) {
|
|
|
+ queryBuilder.must(QueryBuilders.rangeQuery("time")
|
|
|
+ .from(start, true).to(end, true));
|
|
|
+ }
|
|
|
+
|
|
|
+ if ( ObjectUtil.isNotEmpty(uid) ) {
|
|
|
+ queryBuilder =
|
|
|
+ queryBuilder.must(QueryBuilders.termQuery("uid", uid));
|
|
|
+ }
|
|
|
+
|
|
|
+ // 查询字段type='property' and identifier='report', 或者 type='event' 的数据
|
|
|
+ queryBuilder = queryBuilder.must(QueryBuilders.boolQuery()
|
|
|
+ .should(QueryBuilders.boolQuery()
|
|
|
+ .must(QueryBuilders.termQuery("type", "property"))
|
|
|
+ .must(QueryBuilders.termQuery("identifier", "report")))
|
|
|
+ .should(QueryBuilders.termQuery("type", "event")));
|
|
|
+
|
|
|
+ NativeSearchQuery query = new NativeSearchQueryBuilder()
|
|
|
+ .withQuery(queryBuilder)
|
|
|
+ .withAggregations(AggregationBuilders.dateHistogram("agg")
|
|
|
+ .field("time")
|
|
|
+ .calendarInterval(DateHistogramInterval.HOUR)
|
|
|
+ .calendarInterval(DateHistogramInterval.hours(1))
|
|
|
+ )
|
|
|
+ .build();
|
|
|
+
|
|
|
+ ElasticsearchAggregations result = (ElasticsearchAggregations) template
|
|
|
+ .search(query, DocThingModelMessage.class).getAggregations();
|
|
|
+ ParsedDateHistogram histogram = result.aggregations().get("agg");
|
|
|
+
|
|
|
+ List<TimeData> data = new ArrayList<>();
|
|
|
+ for (Histogram.Bucket bucket : histogram.getBuckets()) {
|
|
|
+ long seconds = ((ZonedDateTime) bucket.getKey()).toInstant().getEpochSecond();
|
|
|
+ data.add(new TimeData(seconds * 1000, bucket.getDocCount()));
|
|
|
+ }
|
|
|
+
|
|
|
+ return data;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public List<TimeData> getDeviceDownMessageStatsWithUid(String uid, Long start, Long end) {
|
|
|
+ BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
|
|
|
+ if (ObjectUtil.isNotEmpty(start) && ObjectUtil.isNotEmpty(end)) {
|
|
|
+ queryBuilder.must(QueryBuilders.rangeQuery("time")
|
|
|
+ .from(start, true).to(end, true));
|
|
|
+ }
|
|
|
+
|
|
|
+ if ( ObjectUtil.isNotEmpty(uid) ) {
|
|
|
+ queryBuilder =
|
|
|
+ queryBuilder.must(QueryBuilders.termQuery("uid", uid));
|
|
|
+ }
|
|
|
+
|
|
|
+ // 查询字段type='property' and identifie!='report', 或者 type='service' 或者 type= 'config'
|
|
|
+ queryBuilder = queryBuilder.must(QueryBuilders.boolQuery()
|
|
|
+ .should(QueryBuilders.boolQuery()
|
|
|
+ .must(QueryBuilders.termQuery("type", "property"))
|
|
|
+ .must(QueryBuilders.boolQuery()
|
|
|
+ .mustNot(QueryBuilders.termQuery("identifier", "report"))))
|
|
|
+ .should(QueryBuilders.termQuery("type", "service"))
|
|
|
+ .should(QueryBuilders.termQuery("type", "config")));
|
|
|
+
|
|
|
+ NativeSearchQuery query = new NativeSearchQueryBuilder()
|
|
|
+ .withQuery(queryBuilder)
|
|
|
+ .withAggregations(AggregationBuilders.dateHistogram("agg")
|
|
|
+ .field("time")
|
|
|
+ .calendarInterval(DateHistogramInterval.HOUR)
|
|
|
+ .calendarInterval(DateHistogramInterval.hours(1))
|
|
|
+ )
|
|
|
+ .build();
|
|
|
+
|
|
|
+ ElasticsearchAggregations result = (ElasticsearchAggregations) template
|
|
|
+ .search(query, DocThingModelMessage.class).getAggregations();
|
|
|
+ ParsedDateHistogram histogram = result.aggregations().get("agg");
|
|
|
+
|
|
|
+ List<TimeData> data = new ArrayList<>();
|
|
|
+ for (Histogram.Bucket bucket : histogram.getBuckets()) {
|
|
|
+ long seconds = ((ZonedDateTime) bucket.getKey()).toInstant().getEpochSecond();
|
|
|
+ data.add(new TimeData(seconds * 1000, bucket.getDocCount()));
|
|
|
+ }
|
|
|
+
|
|
|
+ return data;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public void add(ThingModelMessage msg) {
|
|
|
thingModelMessageRepository.save(MapstructUtils.convert(msg, DocThingModelMessage.class));
|