|
@@ -10,12 +10,12 @@ import org.springframework.stereotype.Service;
|
|
|
import vip.xiaonuo.coldchain.core.bean.influxdb.SensorData;
|
|
|
import vip.xiaonuo.coldchain.core.util.DateFormatter;
|
|
|
|
|
|
-import java.time.Duration;
|
|
|
import java.time.Instant;
|
|
|
import java.time.LocalDateTime;
|
|
|
import java.time.ZoneOffset;
|
|
|
import java.time.format.DateTimeFormatter;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
@@ -63,14 +63,16 @@ public class JfcloudSensorDataService extends JfcloudFluxDataService<SensorData>
|
|
|
* @param roads 路数
|
|
|
* @param startTimeStr 查询开始时间
|
|
|
* @param endTimeStr 查询结束时间
|
|
|
+ * @param field temperature/humidity/co2
|
|
|
* @return 查询到的传感器数据列表
|
|
|
*/
|
|
|
|
|
|
- public List<SensorData> queryDataByDeviceIdAndRoads(String deviceId, Integer roads, String startTimeStr, String endTimeStr) {
|
|
|
+ public List<SensorData> queryDataByDeviceIdAndRoads(String deviceId, Integer roads, String startTimeStr, String endTimeStr,String field) {
|
|
|
Assert.notNull(deviceId, "deviceId cannot be null");
|
|
|
Assert.notNull(roads, "roads cannot be null");
|
|
|
Assert.notNull(startTimeStr, "startTime cannot be null");
|
|
|
Assert.notNull(endTimeStr, "endTime cannot be null");
|
|
|
+ Assert.notNull(field, "field cannot be null");
|
|
|
//替换起始时间和结束时间2024/09/09 这种格式为2024-09-09
|
|
|
startTimeStr = DateFormatter.replaceDateFormat(startTimeStr);
|
|
|
endTimeStr = DateFormatter.replaceDateFormat(endTimeStr);
|
|
@@ -83,49 +85,19 @@ public class JfcloudSensorDataService extends JfcloudFluxDataService<SensorData>
|
|
|
}
|
|
|
String startTimeFormatted = convertToISO8601(startTimeStr);
|
|
|
String endTimeFormatted = convertToISO8601(endTimeStr);
|
|
|
- // Determine the aggregation window (by day or by hour)
|
|
|
- String aggregationWindow = determineAggregationWindow(startTimeFormatted, endTimeFormatted);
|
|
|
- String query = buildRangeTimeFluxQuery(getBucketName(), deviceId, roads + "", startTimeFormatted, endTimeFormatted, aggregationWindow);
|
|
|
- // 执行查询
|
|
|
+ String aggregationWindow = FluxAggregationUtils.determineAggregationWindow(startTimeFormatted, endTimeFormatted);
|
|
|
+ String measurement = "sensor_data";
|
|
|
+ Map<String, String> filters = Map.of(
|
|
|
+ "device_id", deviceId,
|
|
|
+ "roads", roads.toString()
|
|
|
+ );
|
|
|
+ String query = FluxQueryBuilder.buildRangeQuery(getBucketName(), startTimeFormatted, endTimeFormatted, measurement, field, filters, aggregationWindow);
|
|
|
QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
|
List<FluxTable> results = queryApi.query(query);
|
|
|
-
|
|
|
- // 将查询结果转换为 SensorData 实体并返回
|
|
|
return results.stream().flatMap(table -> table.getRecords().stream()).map(fluxRecord -> mapRecordToEntity(fluxRecord, getEntityClass())) // 转换为 SensorData 实体
|
|
|
.collect(Collectors.toList()); // 返回多条数据
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Builds the Flux query dynamically based on the provided parameters.
|
|
|
- *
|
|
|
- * @param bucketName The name of the InfluxDB bucket.
|
|
|
- * @param deviceId The device ID to filter by.
|
|
|
- * @param roads The roads field to filter by.
|
|
|
- * @param startTime The start time of the query (ISO format).
|
|
|
- * @param stopTime The end time of the query (ISO format).
|
|
|
- * @return The Flux query as a string.
|
|
|
- */
|
|
|
- public static String buildRangeTimeFluxQuery(String bucketName, String deviceId, String roads, String startTime, String stopTime, String aggregationWindow) {
|
|
|
- return String.format("temperature = from(bucket: \"%s\")\n" + " |> range(start: time(v: \"%s\"), stop: time(v: \"%s\"))\n" + " |> filter(fn: (r) => r[\"_measurement\"] == \"sensor_data\" and r[\"_field\"] == \"temperature\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\")\n" + " |> aggregateWindow(every: %s, fn: mean, createEmpty: false)\n" + "humidity = from(bucket: \"%s\")\n" + " |> range(start: time(v: \"%s\"), stop: time(v: \"%s\"))\n" + " |> filter(fn: (r) => r[\"_measurement\"] == \"sensor_data\" and r[\"_field\"] == \"humidity\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\")\n" + " |> aggregateWindow(every: %s, fn: mean, createEmpty: false)\n" + "co2 = from(bucket: \"%s\")\n" + " |> range(start: time(v: \"%s\"), stop: time(v: \"%s\"))\n" + " |> filter(fn: (r) => r[\"_measurement\"] == \"sensor_data\" and r[\"_field\"] == \"co2\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\")\n" + " |> aggregateWindow(every: %s, fn: mean, createEmpty: false)\n" + "union(tables: [temperature, humidity, co2])\n" + " |> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")\n" + " |> yield(name: \"latest_data\")", bucketName, startTime, stopTime, deviceId, roads, aggregationWindow, bucketName, startTime, stopTime, deviceId, roads, aggregationWindow, bucketName, startTime, stopTime, deviceId, roads, aggregationWindow);
|
|
|
- }
|
|
|
-
|
|
|
- public static String determineAggregationWindow(String startTime, String stopTime) {
|
|
|
- Instant start = Instant.parse(startTime);
|
|
|
- Instant stop = Instant.parse(stopTime);
|
|
|
- // Calculate the duration between start and stop
|
|
|
- Duration duration = Duration.between(start, stop);
|
|
|
- // If the duration is more than 30 days, return "1w" (weekly aggregation)
|
|
|
- if (duration.toDays() > 30) {
|
|
|
- return "1w"; // Weekly aggregation
|
|
|
- }
|
|
|
- // If the duration is more than 1 day but less than or equal to 30 days, return "1d" (daily aggregation)
|
|
|
- else if (duration.toDays() > 1) {
|
|
|
- return "1d"; // Daily aggregation
|
|
|
- } else {
|
|
|
- // If the duration is less than or equal to 1 day, return "1h" (hourly aggregation)
|
|
|
- return "1h"; // Hourly aggregation
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
/**
|
|
|
* Converts a time string in "yyyy-MM-dd HH:mm:ss" format to ISO 8601 format.
|