|
@@ -3,15 +3,16 @@ package vip.xiaonuo.coldchain.core.service;
|
|
|
import cn.hutool.core.lang.Assert;
|
|
|
import com.github.jfcloud.influxdb.config.JfcloudInfluxDB2Properties;
|
|
|
import com.github.jfcloud.influxdb.flux.JfcloudFluxDataService;
|
|
|
-import com.github.jfcloud.influxdb.flux.QueryCondition;
|
|
|
import com.github.jfcloud.influxdb.service.JfcloudInfluxDBService;
|
|
|
import com.influxdb.client.QueryApi;
|
|
|
import com.influxdb.query.FluxTable;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import vip.xiaonuo.coldchain.core.bean.influxdb.SensorData;
|
|
|
|
|
|
+import java.time.Duration;
|
|
|
import java.time.Instant;
|
|
|
-import java.time.ZoneId;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.time.ZoneOffset;
|
|
|
import java.time.format.DateTimeFormatter;
|
|
|
import java.util.List;
|
|
|
import java.util.stream.Collectors;
|
|
@@ -38,30 +39,22 @@ public class JfcloudSensorDataService extends JfcloudFluxDataService<SensorData>
|
|
|
* @return 最新的传感器数据
|
|
|
*/
|
|
|
public SensorData queryLatestDataByDeviceIdAndRoads(String deviceId, Integer roads) {
|
|
|
- String fluxQuery = String.format(
|
|
|
- "from(bucket: \"%s\") " +
|
|
|
- "|> range(start: -1d) " +
|
|
|
- "|> filter(fn: (r) => r._measurement == \"%s\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\") " +
|
|
|
- "|> sort(columns: [\"_time\"], desc: true) " +
|
|
|
- "|> limit(n: 1) " +
|
|
|
- "|> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\") " +
|
|
|
- "|> keep(columns: [\"_time\", \"temperature\", \"humidity\", \"co2\", \"battery\", \"plugInStatus\", \"location\", \"longitude\", \"latitude\", \"device_id\", \"roads\", \"modelName\", \"create_time\"])",
|
|
|
- getBucketName(),
|
|
|
- getMeasurement(SensorData.class),
|
|
|
- deviceId,
|
|
|
- roads
|
|
|
- );
|
|
|
+ String fluxQuery = buildLatestSensorDataQuery(getBucketName(), deviceId, roads + "");
|
|
|
QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
|
List<FluxTable> tables = queryApi.query(fluxQuery);
|
|
|
|
|
|
// 将查询结果映射为 SensorData 实体
|
|
|
- return tables.stream()
|
|
|
- .flatMap(table -> table.getRecords().stream()) // 拉平所有记录
|
|
|
- .map(fluxRecord -> mapRecordToEntity(fluxRecord,getEntityClass())) // 将每个记录映射为 SensorData 实体
|
|
|
+ return tables.stream().flatMap(table -> table.getRecords().stream()) // 拉平所有记录
|
|
|
+ .map(fluxRecord -> mapRecordToEntity(fluxRecord, getEntityClass())) // 将每个记录映射为 SensorData 实体
|
|
|
.findFirst() // 只取第一条记录(即最新一条记录)
|
|
|
.orElse(null); // 如果没有数据,返回 null
|
|
|
}
|
|
|
|
|
|
+ public String buildLatestSensorDataQuery(String bucketName, String deviceId, String roads) {
|
|
|
+ return String.format("temperature = from(bucket: \"%s\")\n" + " |> range(start: -1h) // Hardcoded time range for the last hour\n" + " |> filter(fn: (r) => r[\"_measurement\"] == \"sensor_data\" and r[\"_field\"] == \"temperature\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\")\n" + " |> last()\n" + "humidity = from(bucket: \"%s\")\n" + " |> range(start: -1h)\n" + " |> filter(fn: (r) => r[\"_measurement\"] == \"sensor_data\" and r[\"_field\"] == \"humidity\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\")\n" + " |> last()\n" + "co2 = from(bucket: \"%s\")\n" + " |> range(start: -1h)\n" + " |> filter(fn: (r) => r[\"_measurement\"] == \"sensor_data\" and r[\"_field\"] == \"co2\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\")\n" + " |> last()\n" + "plugInStatus = from(bucket: \"%s\")\n" + " |> range(start: -1h)\n" + " |> filter(fn: (r) => r[\"_measurement\"] == \"sensor_data\" and r[\"_field\"] == \"plugInStatus\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\")\n" + " |> last()\n" + "battery = from(bucket: \"%s\")\n" + " |> range(start: -1h)\n" + " |> filter(fn: (r) => r[\"_measurement\"] == \"sensor_data\" and r[\"_field\"] == \"battery\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\")\n" + " |> last()\n" + "union(tables: [temperature, humidity, co2, plugInStatus, battery])\n" + " |> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")\n" + " |> yield(name: \"latest_data\")", bucketName, deviceId, roads, bucketName, deviceId, roads, bucketName, deviceId, roads, bucketName, deviceId, roads, bucketName, deviceId, roads);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* 根据 deviceId、roads 和时间范围查询数据
|
|
|
*
|
|
@@ -84,99 +77,131 @@ public class JfcloudSensorDataService extends JfcloudFluxDataService<SensorData>
|
|
|
if (endTimeStr.length() == 10) {
|
|
|
endTimeStr += " 23:59:59";
|
|
|
}
|
|
|
- // 日期时间字符串解析格式
|
|
|
- DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
|
|
|
- .withZone(ZoneId.of("UTC"));
|
|
|
-
|
|
|
- // 将传入的日期时间字符串解析为 Instant 对象
|
|
|
- Instant startTime = Instant.from(inputFormatter.parse(startTimeStr));
|
|
|
- Instant endTime = Instant.from(inputFormatter.parse(endTimeStr));
|
|
|
-
|
|
|
- // 格式化 Instant 为 InfluxDB 可以接受的字符串格式
|
|
|
- DateTimeFormatter formatter = DateTimeFormatter.ISO_INSTANT.withZone(ZoneId.of("UTC"));
|
|
|
- String startTimeFormatted = formatter.format(startTime);
|
|
|
- String endTimeFormatted = formatter.format(endTime);
|
|
|
-
|
|
|
- // 构建Flux查询
|
|
|
-// String query = String.format(
|
|
|
-// "from(bucket: \"%s\") |> range(start: %s, stop: %s) " +
|
|
|
-// "|> filter(fn: (r) => r._measurement == \"sensor_data\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%d\") " +
|
|
|
-// "|> sort(columns: [\"_time\"], desc: true) " +
|
|
|
-// "|> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\") " +
|
|
|
-// "|> keep(columns: [\"_time\", \"temperature\", \"humidity\", \"co2\", \"battery\", \"plugInStatus\", \"location\", \"longitude\", \"latitude\", \"device_id\", \"roads\", \"modelName\", \"create_time\"])",
|
|
|
-// getBucketName(), startTimeFormatted, endTimeFormatted, deviceId, roads
|
|
|
-// );
|
|
|
- String query = String.format(
|
|
|
- "from(bucket: \"%s\") " +
|
|
|
- "|> range(start: %s, stop: %s) " +
|
|
|
- "|> filter(fn: (r) => r._measurement == \"sensor_data\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%d\") " +
|
|
|
- "|> aggregateWindow(every: 1h, fn: mean, createEmpty: false) " + // 按小时聚合,使用平均值
|
|
|
- "|> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\") " +
|
|
|
- "|> keep(columns: [\"_time\", \"temperature\", \"humidity\", \"co2\", \"battery\", \"plugInStatus\", \"location\", \"longitude\", \"latitude\", \"device_id\", \"roads\", \"modelName\", \"create_time\"])",
|
|
|
- getBucketName(), startTimeFormatted, endTimeFormatted, deviceId, roads
|
|
|
- );
|
|
|
-
|
|
|
+ String startTimeFormatted = convertToISO8601(startTimeStr);
|
|
|
+ String endTimeFormatted = convertToISO8601(endTimeStr);
|
|
|
+ // Determine the aggregation window (by day or by hour)
|
|
|
+ String aggregationWindow = determineAggregationWindow(startTimeFormatted, endTimeFormatted);
|
|
|
+ String query = buildRangeTimeFluxQuery(getBucketName(), deviceId, roads + "", startTimeFormatted, endTimeFormatted, aggregationWindow);
|
|
|
// 执行查询
|
|
|
QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
|
List<FluxTable> results = queryApi.query(query);
|
|
|
|
|
|
// 将查询结果转换为 SensorData 实体并返回
|
|
|
- return results.stream()
|
|
|
- .flatMap(table -> table.getRecords().stream())
|
|
|
- .map(fluxRecord -> mapRecordToEntity(fluxRecord,getEntityClass())) // 转换为 SensorData 实体
|
|
|
+ return results.stream().flatMap(table -> table.getRecords().stream()).map(fluxRecord -> mapRecordToEntity(fluxRecord, getEntityClass())) // 转换为 SensorData 实体
|
|
|
.collect(Collectors.toList()); // 返回多条数据
|
|
|
}
|
|
|
|
|
|
-
|
|
|
/**
|
|
|
- * 根据时间范围、设备ID、路数和 bucket 查询传感器数据,并筛选需要的字段
|
|
|
+ * Builds the Flux query dynamically based on the provided parameters.
|
|
|
*
|
|
|
- * @param startTime 起始时间
|
|
|
- * @param endTime 结束时间
|
|
|
- * @param deviceId 设备ID
|
|
|
- * @param roads 路数
|
|
|
- * @param fields 查询的字段列表,可以包含 "temperature", "humidity", "co2", "battery", "plugInStatus", etc.
|
|
|
- * @return 查询到的传感器数据列表
|
|
|
+ * @param bucketName The name of the InfluxDB bucket.
|
|
|
+ * @param deviceId The device ID to filter by.
|
|
|
+ * @param roads The roads field to filter by.
|
|
|
+ * @param startTime The start time of the query (ISO format).
|
|
|
+ * @param stopTime The end time of the query (ISO format).
|
|
|
+ * @return The Flux query as a string.
|
|
|
*/
|
|
|
- public List<SensorData> queryDataByTimeAndDevice(String startTime, String endTime, String deviceId, Integer roads, List<String> fields) {
|
|
|
- // 构建查询字段的条件
|
|
|
- String fieldsToSelect = fields.isEmpty() ? "*" : String.join(",", fields);
|
|
|
- // 构建Flux查询语句
|
|
|
- String query = String.format(
|
|
|
- "from(bucket: \"%s\") |> range(start: %s, stop: %s) " +
|
|
|
- "|> filter(fn: (r) => r._measurement == \"sensor_data\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%d\") " +
|
|
|
- "|> sort(columns: [\"_time\"], desc: true) " +
|
|
|
- "|> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\") " +
|
|
|
- "|> keep(columns: [\"_time\", %s])",
|
|
|
- getBucketName(), startTime, endTime, deviceId, roads, fieldsToSelect
|
|
|
+ public static String buildRangeTimeFluxQuery(String bucketName, String deviceId, String roads, String startTime, String stopTime, String aggregationWindow) {
|
|
|
+ return String.format(
|
|
|
+ "temperature = from(bucket: \"%s\")\n" +
|
|
|
+ " |> range(start: time(v: \"%s\"), stop: time(v: \"%s\"))\n" +
|
|
|
+ " |> filter(fn: (r) => r[\"_measurement\"] == \"sensor_data\" and r[\"_field\"] == \"temperature\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\")\n" +
|
|
|
+ " |> aggregateWindow(every: %s, fn: mean, createEmpty: false)\n" +
|
|
|
+ "humidity = from(bucket: \"%s\")\n" +
|
|
|
+ " |> range(start: time(v: \"%s\"), stop: time(v: \"%s\"))\n" +
|
|
|
+ " |> filter(fn: (r) => r[\"_measurement\"] == \"sensor_data\" and r[\"_field\"] == \"humidity\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\")\n" +
|
|
|
+ " |> aggregateWindow(every: %s, fn: mean, createEmpty: false)\n" +
|
|
|
+ "co2 = from(bucket: \"%s\")\n" +
|
|
|
+ " |> range(start: time(v: \"%s\"), stop: time(v: \"%s\"))\n" +
|
|
|
+ " |> filter(fn: (r) => r[\"_measurement\"] == \"sensor_data\" and r[\"_field\"] == \"co2\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\")\n" +
|
|
|
+ " |> aggregateWindow(every: %s, fn: mean, createEmpty: false)\n" +
|
|
|
+ "union(tables: [temperature, humidity, co2])\n" +
|
|
|
+ " |> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")\n" +
|
|
|
+ " |> yield(name: \"latest_data\")",
|
|
|
+ bucketName, startTime, stopTime, deviceId, roads, aggregationWindow,
|
|
|
+ bucketName, startTime, stopTime, deviceId, roads, aggregationWindow,
|
|
|
+ bucketName, startTime, stopTime, deviceId, roads, aggregationWindow
|
|
|
);
|
|
|
+ }
|
|
|
|
|
|
- // 查询InfluxDB并返回结果
|
|
|
- QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
|
- List<FluxTable> results = queryApi.query(query);
|
|
|
-
|
|
|
- // 将查询结果转换为 SensorData 实体
|
|
|
- return results.stream()
|
|
|
- .flatMap(table -> table.getRecords().stream())
|
|
|
- .map(fluxRecord -> mapRecordToEntity(fluxRecord,getEntityClass())) // 转换为 SensorData 实体
|
|
|
- .collect(Collectors.toList()); // 返回多条数据
|
|
|
+ public static String determineAggregationWindow(String startTime, String stopTime) {
|
|
|
+ Instant start = Instant.parse(startTime);
|
|
|
+ Instant stop = Instant.parse(stopTime);
|
|
|
+ // Calculate the duration between start and stop
|
|
|
+ Duration duration = Duration.between(start, stop);
|
|
|
+ // If the duration is more than 30 days, return "1w" (weekly aggregation)
|
|
|
+ if (duration.toDays() > 30) {
|
|
|
+ return "1w"; // Weekly aggregation
|
|
|
+ }
|
|
|
+ // If the duration is more than 1 day but less than or equal to 30 days, return "1d" (daily aggregation)
|
|
|
+ else if (duration.toDays() > 1) {
|
|
|
+ return "1d"; // Daily aggregation
|
|
|
+ } else {
|
|
|
+ // If the duration is less than or equal to 1 day, return "1h" (hourly aggregation)
|
|
|
+ return "1h"; // Hourly aggregation
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 根据 deviceId、roads、时间范围和查询条件查询数据
|
|
|
+ * Converts a time string in "yyyy-MM-dd HH:mm:ss" format to ISO 8601 format.
|
|
|
*
|
|
|
- * @param deviceId 设备 ID
|
|
|
- * @param roads 路数
|
|
|
- * @param startTime 查询开始时间
|
|
|
- * @param endTime 查询结束时间
|
|
|
- * @param queryConditions 查询条件列表
|
|
|
- * @return 查询到的传感器数据列表
|
|
|
+ * @param time The time string in "yyyy-MM-dd HH:mm:ss" format.
|
|
|
+ * @return The ISO 8601 formatted time string.
|
|
|
*/
|
|
|
- public List<SensorData> querySensorData(String deviceId, int roads, Instant startTime, Instant endTime, List<QueryCondition> queryConditions) {
|
|
|
- String fluxQuery = buildFluxQuery(deviceId, roads, startTime, endTime, queryConditions, SensorData.class);
|
|
|
- QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
|
- List<FluxTable> tables = queryApi.query(fluxQuery);
|
|
|
- // 将查询结果映射为 SensorData 实体
|
|
|
- return tables.stream().flatMap(table -> table.getRecords().stream()).map(fluxRecord -> mapRecordToEntity(fluxRecord,getEntityClass())).collect(Collectors.toList());
|
|
|
+ public static String convertToISO8601(String time) {
|
|
|
+ // Define the input and output formats
|
|
|
+ DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
+ DateTimeFormatter outputFormatter = DateTimeFormatter.ISO_DATE_TIME;
|
|
|
+ // Parse the input time string to LocalDateTime (in system default zone)
|
|
|
+ LocalDateTime localDateTime = LocalDateTime.parse(time, inputFormatter);
|
|
|
+
|
|
|
+ // Convert the LocalDateTime to Instant (UTC time zone)
|
|
|
+ Instant instant = localDateTime.atZone(ZoneOffset.UTC).toInstant();
|
|
|
+
|
|
|
+ // Format the Instant to ISO 8601
|
|
|
+ return outputFormatter.format(instant.atZone(ZoneOffset.UTC));
|
|
|
}
|
|
|
+
|
|
|
+// /**
|
|
|
+// * 根据时间范围、设备ID、路数和 bucket 查询传感器数据,并筛选需要的字段
|
|
|
+// *
|
|
|
+// * @param startTime 起始时间
|
|
|
+// * @param endTime 结束时间
|
|
|
+// * @param deviceId 设备ID
|
|
|
+// * @param roads 路数
|
|
|
+// * @param fields 查询的字段列表,可以包含 "temperature", "humidity", "co2", "battery", "plugInStatus", etc.
|
|
|
+// * @return 查询到的传感器数据列表
|
|
|
+// */
|
|
|
+// public List<SensorData> queryDataByTimeAndDevice(String startTime, String endTime, String deviceId, Integer roads, List<String> fields) {
|
|
|
+// // 构建查询字段的条件
|
|
|
+// String fieldsToSelect = fields.isEmpty() ? "*" : String.join(",", fields);
|
|
|
+// // 构建Flux查询语句
|
|
|
+// String query = String.format("from(bucket: \"%s\") |> range(start: %s, stop: %s) " + "|> filter(fn: (r) => r._measurement == \"sensor_data\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%d\") " + "|> sort(columns: [\"_time\"], desc: true) " + "|> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\") " + "|> keep(columns: [\"_time\", %s])", getBucketName(), startTime, endTime, deviceId, roads, fieldsToSelect);
|
|
|
+//
|
|
|
+// // 查询InfluxDB并返回结果
|
|
|
+// QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
|
+// List<FluxTable> results = queryApi.query(query);
|
|
|
+//
|
|
|
+// // 将查询结果转换为 SensorData 实体
|
|
|
+// return results.stream().flatMap(table -> table.getRecords().stream()).map(fluxRecord -> mapRecordToEntity(fluxRecord, getEntityClass())) // 转换为 SensorData 实体
|
|
|
+// .collect(Collectors.toList()); // 返回多条数据
|
|
|
+// }
|
|
|
+//
|
|
|
+// /**
|
|
|
+// * 根据 deviceId、roads、时间范围和查询条件查询数据
|
|
|
+// *
|
|
|
+// * @param deviceId 设备 ID
|
|
|
+// * @param roads 路数
|
|
|
+// * @param startTime 查询开始时间
|
|
|
+// * @param endTime 查询结束时间
|
|
|
+// * @param queryConditions 查询条件列表
|
|
|
+// * @return 查询到的传感器数据列表
|
|
|
+// */
|
|
|
+// public List<SensorData> querySensorData(String deviceId, int roads, Instant startTime, Instant endTime, List<QueryCondition> queryConditions) {
|
|
|
+// String fluxQuery = buildFluxQuery(deviceId, roads, startTime, endTime, queryConditions, SensorData.class);
|
|
|
+// QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
|
+// List<FluxTable> tables = queryApi.query(fluxQuery);
|
|
|
+// // 将查询结果映射为 SensorData 实体
|
|
|
+// return tables.stream().flatMap(table -> table.getRecords().stream()).map(fluxRecord -> mapRecordToEntity(fluxRecord, getEntityClass())).collect(Collectors.toList());
|
|
|
+// }
|
|
|
}
|