|
@@ -63,24 +63,19 @@ public class SensorDataService {
|
|
*/
|
|
*/
|
|
public String buildFluxQuery(String deviceId, int roads, Instant startTime, Instant endTime, List<SensorQueryCondition> queryConditions, Class<?> sensorDataClass) {
|
|
public String buildFluxQuery(String deviceId, int roads, Instant startTime, Instant endTime, List<SensorQueryCondition> queryConditions, Class<?> sensorDataClass) {
|
|
StringBuilder fluxQuery = new StringBuilder();
|
|
StringBuilder fluxQuery = new StringBuilder();
|
|
-
|
|
|
|
// 获取 measurement(表名)
|
|
// 获取 measurement(表名)
|
|
String measurement = getMeasurement(sensorDataClass);
|
|
String measurement = getMeasurement(sensorDataClass);
|
|
-
|
|
|
|
// 获取字段名称列表
|
|
// 获取字段名称列表
|
|
List<String> fieldNames = getFieldNames(sensorDataClass);
|
|
List<String> fieldNames = getFieldNames(sensorDataClass);
|
|
-
|
|
|
|
fluxQuery.append("from(bucket: \"").append(getBucketName()).append("\") ") // 使用动态获取的桶名
|
|
fluxQuery.append("from(bucket: \"").append(getBucketName()).append("\") ") // 使用动态获取的桶名
|
|
.append("|> range(start: ").append(startTime).append(", stop: ").append(endTime).append(") ") // 设置时间范围
|
|
.append("|> range(start: ").append(startTime).append(", stop: ").append(endTime).append(") ") // 设置时间范围
|
|
.append("|> filter(fn: (r) => r._measurement == \"").append(measurement).append("\" and r.device_id == \"").append(deviceId).append("\" and r.roads == ").append(roads).append(") ");
|
|
.append("|> filter(fn: (r) => r._measurement == \"").append(measurement).append("\" and r.device_id == \"").append(deviceId).append("\" and r.roads == ").append(roads).append(") ");
|
|
-
|
|
|
|
// 添加自定义查询条件
|
|
// 添加自定义查询条件
|
|
if (queryConditions != null) {
|
|
if (queryConditions != null) {
|
|
for (SensorQueryCondition condition : queryConditions) {
|
|
for (SensorQueryCondition condition : queryConditions) {
|
|
fluxQuery.append("|> filter(fn: (r) => r._field == \"").append(condition.getField()).append("\" and r._value ").append(condition.getOperator().getOperator()).append(" ").append(condition.getValue()).append(") ");
|
|
fluxQuery.append("|> filter(fn: (r) => r._field == \"").append(condition.getField()).append("\" and r._value ").append(condition.getOperator().getOperator()).append(" ").append(condition.getValue()).append(") ");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
fluxQuery.append("|> sort(columns: [\"_time\"], desc: true)"); // 按时间降序排列
|
|
fluxQuery.append("|> sort(columns: [\"_time\"], desc: true)"); // 按时间降序排列
|
|
return fluxQuery.toString();
|
|
return fluxQuery.toString();
|
|
}
|
|
}
|
|
@@ -99,7 +94,6 @@ public class SensorDataService {
|
|
String fluxQuery = buildFluxQuery(deviceId, roads, startTime, endTime, queryConditions, SensorData.class);
|
|
String fluxQuery = buildFluxQuery(deviceId, roads, startTime, endTime, queryConditions, SensorData.class);
|
|
QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
List<FluxTable> tables = queryApi.query(fluxQuery);
|
|
List<FluxTable> tables = queryApi.query(fluxQuery);
|
|
-
|
|
|
|
// 将查询结果映射为 SensorData 实体
|
|
// 将查询结果映射为 SensorData 实体
|
|
return tables.stream().flatMap(table -> table.getRecords().stream()).map(SensorData::mapToSensorData).collect(Collectors.toList());
|
|
return tables.stream().flatMap(table -> table.getRecords().stream()).map(SensorData::mapToSensorData).collect(Collectors.toList());
|
|
}
|
|
}
|
|
@@ -113,13 +107,39 @@ public class SensorDataService {
|
|
*/
|
|
*/
|
|
public SensorData queryLatestDataByDeviceIdAndRoads(String deviceId, Integer roads) {
|
|
public SensorData queryLatestDataByDeviceIdAndRoads(String deviceId, Integer roads) {
|
|
// 构建 Flux 查询,获取最新一条记录
|
|
// 构建 Flux 查询,获取最新一条记录
|
|
- String fluxQuery = String.format("from(bucket: \"%s\") |> range(start: -30d) |> filter(fn: (r) => r._measurement == \"%s\" and r.device_id == \"%s\" and r.roads == %d) |> sort(columns: [\"_time\"], desc: true) |> limit(n: 1)", getBucketName(), getMeasurement(SensorData.class), deviceId, roads);
|
|
|
|
-
|
|
|
|
|
|
+// String fluxQuery = String.format(
|
|
|
|
+// "from(bucket: \"%s\") " +
|
|
|
|
+// "|> range(start: -30d) " + // 查询过去 30 天的数据
|
|
|
|
+// "|> filter(fn: (r) => r._measurement == \"%s\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\") " +
|
|
|
|
+// "|> sort(columns: [\"_time\"], desc: true) " + // 按时间降序排序
|
|
|
|
+// "|> limit(n: 1)", // 限制查询结果为 1 条
|
|
|
|
+// getBucketName(), // 动态获取桶名
|
|
|
|
+// getMeasurement(SensorData.class), // 动态获取 measurement 名称
|
|
|
|
+// deviceId, // 设备 ID
|
|
|
|
+// roads // 路数
|
|
|
|
+// );
|
|
|
|
+ String fluxQuery = String.format(
|
|
|
|
+ "from(bucket: \"%s\") " +
|
|
|
|
+ "|> range(start: -30d) " +
|
|
|
|
+ "|> filter(fn: (r) => r._measurement == \"%s\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\") " +
|
|
|
|
+ "|> sort(columns: [\"_time\"], desc: true) " +
|
|
|
|
+ "|> limit(n: 1) " +
|
|
|
|
+ "|> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\") " +
|
|
|
|
+ "|> keep(columns: [\"_time\", \"temperature\", \"humidity\", \"co2\", \"battery\", \"plugInStatus\", \"location\", \"longitude\", \"latitude\", \"device_id\", \"roads\", \"modelName\"])",
|
|
|
|
+ getBucketName(),
|
|
|
|
+ getMeasurement(SensorData.class),
|
|
|
|
+ deviceId,
|
|
|
|
+ roads
|
|
|
|
+ );
|
|
QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
List<FluxTable> tables = queryApi.query(fluxQuery);
|
|
List<FluxTable> tables = queryApi.query(fluxQuery);
|
|
|
|
|
|
// 将查询结果映射为 SensorData 实体
|
|
// 将查询结果映射为 SensorData 实体
|
|
- return tables.stream().flatMap(table -> table.getRecords().stream()).map(SensorData::mapToSensorData).findFirst().orElse(null); // 如果没有数据,返回 null
|
|
|
|
|
|
+ return tables.stream()
|
|
|
|
+ .flatMap(table -> table.getRecords().stream()) // 拉平所有记录
|
|
|
|
+ .map(SensorData::mapToSensorData) // 将每个记录映射为 SensorData 实体
|
|
|
|
+ .findFirst() // 只取第一条记录(即最新一条记录)
|
|
|
|
+ .orElse(null); // 如果没有数据,返回 null
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -135,7 +155,6 @@ public class SensorDataService {
|
|
String query = String.format("SELECT * FROM \"%s\" WHERE \"device_id\" = '%s' AND \"roads\" = %d AND time >= '%s' AND time <= '%s' ORDER BY time DESC", getMeasurement(SensorData.class), deviceId, roads, startTime, endTime);
|
|
String query = String.format("SELECT * FROM \"%s\" WHERE \"device_id\" = '%s' AND \"roads\" = %d AND time >= '%s' AND time <= '%s' ORDER BY time DESC", getMeasurement(SensorData.class), deviceId, roads, startTime, endTime);
|
|
QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
List<FluxTable> results = queryApi.query(query);
|
|
List<FluxTable> results = queryApi.query(query);
|
|
-
|
|
|
|
// 将查询结果转换为 SensorData 实体
|
|
// 将查询结果转换为 SensorData 实体
|
|
return results.stream().flatMap(table -> table.getRecords().stream()).map(SensorData::mapToSensorData).collect(Collectors.toList());
|
|
return results.stream().flatMap(table -> table.getRecords().stream()).map(SensorData::mapToSensorData).collect(Collectors.toList());
|
|
}
|
|
}
|