|
@@ -1,157 +1,145 @@
|
|
-package vip.xiaonuo.coldchain.core.bean.influxdb;
|
|
|
|
-
|
|
|
|
-import com.github.jfcloud.influxdb.config.JfcloudInfluxDB2Properties;
|
|
|
|
-import com.github.jfcloud.influxdb.flux.QueryCondition;
|
|
|
|
-import com.github.jfcloud.influxdb.service.JfcloudInfluxDBService;
|
|
|
|
-import com.influxdb.annotations.Column;
|
|
|
|
-import com.influxdb.annotations.Measurement;
|
|
|
|
-import com.influxdb.client.QueryApi;
|
|
|
|
-import com.influxdb.query.FluxTable;
|
|
|
|
-import lombok.RequiredArgsConstructor;
|
|
|
|
-import org.springframework.stereotype.Component;
|
|
|
|
-import vip.xiaonuo.coldchain.core.config.JfcloudColdChainConstants;
|
|
|
|
-
|
|
|
|
-import java.time.Instant;
|
|
|
|
-import java.util.Arrays;
|
|
|
|
-import java.util.List;
|
|
|
|
-import java.util.stream.Collectors;
|
|
|
|
-
|
|
|
|
-/**
|
|
|
|
- * @description: Service for querying SensorData from InfluxDB
|
|
|
|
- */
|
|
|
|
-@Component
|
|
|
|
-@RequiredArgsConstructor
|
|
|
|
-@Deprecated
|
|
|
|
-public class SensorDataService {
|
|
|
|
-
|
|
|
|
- private final JfcloudInfluxDBService jfcloudInfluxDBService;
|
|
|
|
- private final JfcloudInfluxDB2Properties jfcloudInfluxDB2Properties;
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 获取 measurement(表名)通过反射
|
|
|
|
- *
|
|
|
|
- * @param clazz 要查询的实体类
|
|
|
|
- * @return measurement(表名)
|
|
|
|
- */
|
|
|
|
- public String getMeasurement(Class<?> clazz) {
|
|
|
|
- Measurement measurement = clazz.getAnnotation(Measurement.class);
|
|
|
|
- if (measurement != null) {
|
|
|
|
- return measurement.name();
|
|
|
|
- }
|
|
|
|
- // 默认表名
|
|
|
|
- return JfcloudColdChainConstants.INFLUXDB_DEFAULT_MEASUREMENT_NAME;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 获取所有的字段名称,通过反射
|
|
|
|
- *
|
|
|
|
- * @param clazz 要查询的实体类
|
|
|
|
- * @return 字段名称列表
|
|
|
|
- */
|
|
|
|
- public List<String> getFieldNames(Class<?> clazz) {
|
|
|
|
- return Arrays.stream(clazz.getDeclaredFields()).filter(field -> field.isAnnotationPresent(Column.class)).map(field -> field.getAnnotation(Column.class).name()).collect(Collectors.toList());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 构建 Flux 查询语句的公共方法
|
|
|
|
- *
|
|
|
|
- * @param deviceId 设备 ID
|
|
|
|
- * @param roads 路数
|
|
|
|
- * @param startTime 查询开始时间
|
|
|
|
- * @param endTime 查询结束时间
|
|
|
|
- * @param queryConditions 其他查询条件(如温度、湿度等)
|
|
|
|
- * @param sensorDataClass SensorData 类
|
|
|
|
- * @return 构建的 Flux 查询语句
|
|
|
|
- */
|
|
|
|
- public String buildFluxQuery(String deviceId, int roads, Instant startTime, Instant endTime, List<QueryCondition> queryConditions, Class<?> sensorDataClass) {
|
|
|
|
- StringBuilder fluxQuery = new StringBuilder();
|
|
|
|
- // 获取 measurement(表名)
|
|
|
|
- String measurement = getMeasurement(sensorDataClass);
|
|
|
|
- // 获取字段名称列表
|
|
|
|
- List<String> fieldNames = getFieldNames(sensorDataClass);
|
|
|
|
- fluxQuery.append("from(bucket: \"").append(getBucketName()).append("\") ") // 使用动态获取的桶名
|
|
|
|
- .append("|> range(start: ").append(startTime).append(", stop: ").append(endTime).append(") ") // 设置时间范围
|
|
|
|
- .append("|> filter(fn: (r) => r._measurement == \"").append(measurement).append("\" and r.device_id == \"").append(deviceId).append("\" and r.roads == ").append(roads).append(") ");
|
|
|
|
- // 添加自定义查询条件
|
|
|
|
- if (queryConditions != null) {
|
|
|
|
- for (QueryCondition condition : queryConditions) {
|
|
|
|
- fluxQuery.append("|> filter(fn: (r) => r._field == \"").append(condition.getField()).append("\" and r._value ").append(condition.getOperator().getOperator()).append(" ").append(condition.getValue()).append(") ");
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- fluxQuery.append("|> sort(columns: [\"_time\"], desc: true)"); // 按时间降序排列
|
|
|
|
- return fluxQuery.toString();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 根据 deviceId 和 roads 查询最新的数据
|
|
|
|
- *
|
|
|
|
- * @param deviceId 设备 ID
|
|
|
|
- * @param roads 路数
|
|
|
|
- * @return 最新的传感器数据
|
|
|
|
- */
|
|
|
|
- public SensorData queryLatestDataByDeviceIdAndRoads(String deviceId, Integer roads) {
|
|
|
|
- // 构建 Flux 查询,获取最新一条记录
|
|
|
|
|
|
+//package vip.xiaonuo.coldchain.core.bean.influxdb;
|
|
|
|
+//
|
|
|
|
+//import com.github.jfcloud.influxdb.config.JfcloudInfluxDB2Properties;
|
|
|
|
+//import com.github.jfcloud.influxdb.flux.QueryCondition;
|
|
|
|
+//import com.github.jfcloud.influxdb.service.JfcloudInfluxDBService;
|
|
|
|
+//import com.influxdb.annotations.Column;
|
|
|
|
+//import com.influxdb.annotations.Measurement;
|
|
|
|
+//import com.influxdb.client.QueryApi;
|
|
|
|
+//import com.influxdb.query.FluxTable;
|
|
|
|
+//import lombok.RequiredArgsConstructor;
|
|
|
|
+//import org.springframework.stereotype.Component;
|
|
|
|
+//import vip.xiaonuo.coldchain.core.config.JfcloudColdChainConstants;
|
|
|
|
+//
|
|
|
|
+//import java.time.Instant;
|
|
|
|
+//import java.util.Arrays;
|
|
|
|
+//import java.util.List;
|
|
|
|
+//import java.util.stream.Collectors;
|
|
|
|
+//
|
|
|
|
+///**
|
|
|
|
+// * @description: Service for querying SensorData from InfluxDB
|
|
|
|
+// */
|
|
|
|
+//@Component
|
|
|
|
+//@RequiredArgsConstructor
|
|
|
|
+//@Deprecated
|
|
|
|
+//public class SensorDataService {
|
|
|
|
+//
|
|
|
|
+// private final JfcloudInfluxDBService jfcloudInfluxDBService;
|
|
|
|
+// private final JfcloudInfluxDB2Properties jfcloudInfluxDB2Properties;
|
|
|
|
+//
|
|
|
|
+// /**
|
|
|
|
+// * 获取 measurement(表名)通过反射
|
|
|
|
+// *
|
|
|
|
+// * @param clazz 要查询的实体类
|
|
|
|
+// * @return measurement(表名)
|
|
|
|
+// */
|
|
|
|
+// public String getMeasurement(Class<?> clazz) {
|
|
|
|
+// Measurement measurement = clazz.getAnnotation(Measurement.class);
|
|
|
|
+// if (measurement != null) {
|
|
|
|
+// return measurement.name();
|
|
|
|
+// }
|
|
|
|
+// // 默认表名
|
|
|
|
+// return JfcloudColdChainConstants.INFLUXDB_DEFAULT_MEASUREMENT_NAME;
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+// /**
|
|
|
|
+// * 获取所有的字段名称,通过反射
|
|
|
|
+// *
|
|
|
|
+// * @param clazz 要查询的实体类
|
|
|
|
+// * @return 字段名称列表
|
|
|
|
+// */
|
|
|
|
+// public List<String> getFieldNames(Class<?> clazz) {
|
|
|
|
+// return Arrays.stream(clazz.getDeclaredFields()).filter(field -> field.isAnnotationPresent(Column.class)).map(field -> field.getAnnotation(Column.class).name()).collect(Collectors.toList());
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+// /**
|
|
|
|
+// * 构建 Flux 查询语句的公共方法
|
|
|
|
+// *
|
|
|
|
+// * @param deviceId 设备 ID
|
|
|
|
+// * @param roads 路数
|
|
|
|
+// * @param startTime 查询开始时间
|
|
|
|
+// * @param endTime 查询结束时间
|
|
|
|
+// * @param queryConditions 其他查询条件(如温度、湿度等)
|
|
|
|
+// * @param sensorDataClass SensorData 类
|
|
|
|
+// * @return 构建的 Flux 查询语句
|
|
|
|
+// */
|
|
|
|
+// public String buildFluxQuery(String deviceId, int roads, Instant startTime, Instant endTime, List<QueryCondition> queryConditions, Class<?> sensorDataClass) {
|
|
|
|
+// StringBuilder fluxQuery = new StringBuilder();
|
|
|
|
+// // 获取 measurement(表名)
|
|
|
|
+// String measurement = getMeasurement(sensorDataClass);
|
|
|
|
+// // 获取字段名称列表
|
|
|
|
+// List<String> fieldNames = getFieldNames(sensorDataClass);
|
|
|
|
+// fluxQuery.append("from(bucket: \"").append(getBucketName()).append("\") ") // 使用动态获取的桶名
|
|
|
|
+// .append("|> range(start: ").append(startTime).append(", stop: ").append(endTime).append(") ") // 设置时间范围
|
|
|
|
+// .append("|> filter(fn: (r) => r._measurement == \"").append(measurement).append("\" and r.device_id == \"").append(deviceId).append("\" and r.roads == ").append(roads).append(") ");
|
|
|
|
+// // 添加自定义查询条件
|
|
|
|
+// if (queryConditions != null) {
|
|
|
|
+// for (QueryCondition condition : queryConditions) {
|
|
|
|
+// fluxQuery.append("|> filter(fn: (r) => r._field == \"").append(condition.getField()).append("\" and r._value ").append(condition.getOperator().getOperator()).append(" ").append(condition.getValue()).append(") ");
|
|
|
|
+// }
|
|
|
|
+// }
|
|
|
|
+// fluxQuery.append("|> sort(columns: [\"_time\"], desc: true)"); // 按时间降序排列
|
|
|
|
+// return fluxQuery.toString();
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+// /**
|
|
|
|
+// * 根据 deviceId 和 roads 查询最新的数据
|
|
|
|
+// *
|
|
|
|
+// * @param deviceId 设备 ID
|
|
|
|
+// * @param roads 路数
|
|
|
|
+// * @return 最新的传感器数据
|
|
|
|
+// */
|
|
|
|
+// public SensorData queryLatestDataByDeviceIdAndRoads(String deviceId, Integer roads) {
|
|
// String fluxQuery = String.format(
|
|
// String fluxQuery = String.format(
|
|
// "from(bucket: \"%s\") " +
|
|
// "from(bucket: \"%s\") " +
|
|
-// "|> range(start: -30d) " + // 查询过去 30 天的数据
|
|
|
|
|
|
+// "|> range(start: -30d) " +
|
|
// "|> filter(fn: (r) => r._measurement == \"%s\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\") " +
|
|
// "|> filter(fn: (r) => r._measurement == \"%s\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\") " +
|
|
-// "|> sort(columns: [\"_time\"], desc: true) " + // 按时间降序排序
|
|
|
|
-// "|> limit(n: 1)", // 限制查询结果为 1 条
|
|
|
|
-// getBucketName(), // 动态获取桶名
|
|
|
|
-// getMeasurement(SensorData.class), // 动态获取 measurement 名称
|
|
|
|
-// deviceId, // 设备 ID
|
|
|
|
-// roads // 路数
|
|
|
|
|
|
+// "|> sort(columns: [\"_time\"], desc: true) " +
|
|
|
|
+// "|> limit(n: 1) " +
|
|
|
|
+// "|> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\") " +
|
|
|
|
+// "|> keep(columns: [\"_time\", \"temperature\", \"humidity\", \"co2\", \"battery\", \"plugInStatus\", \"location\", \"longitude\", \"latitude\", \"device_id\", \"roads\", \"modelName\"])",
|
|
|
|
+// getBucketName(),
|
|
|
|
+// getMeasurement(SensorData.class),
|
|
|
|
+// deviceId,
|
|
|
|
+// roads
|
|
// );
|
|
// );
|
|
- String fluxQuery = String.format(
|
|
|
|
- "from(bucket: \"%s\") " +
|
|
|
|
- "|> range(start: -30d) " +
|
|
|
|
- "|> filter(fn: (r) => r._measurement == \"%s\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\") " +
|
|
|
|
- "|> sort(columns: [\"_time\"], desc: true) " +
|
|
|
|
- "|> limit(n: 1) " +
|
|
|
|
- "|> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\") " +
|
|
|
|
- "|> keep(columns: [\"_time\", \"temperature\", \"humidity\", \"co2\", \"battery\", \"plugInStatus\", \"location\", \"longitude\", \"latitude\", \"device_id\", \"roads\", \"modelName\"])",
|
|
|
|
- getBucketName(),
|
|
|
|
- getMeasurement(SensorData.class),
|
|
|
|
- deviceId,
|
|
|
|
- roads
|
|
|
|
- );
|
|
|
|
- QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
|
|
- List<FluxTable> tables = queryApi.query(fluxQuery);
|
|
|
|
-
|
|
|
|
- // 将查询结果映射为 SensorData 实体
|
|
|
|
- return tables.stream()
|
|
|
|
- .flatMap(table -> table.getRecords().stream()) // 拉平所有记录
|
|
|
|
- .map(SensorData::mapToSensorData) // 将每个记录映射为 SensorData 实体
|
|
|
|
- .findFirst() // 只取第一条记录(即最新一条记录)
|
|
|
|
- .orElse(null); // 如果没有数据,返回 null
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 根据 deviceId、roads 和时间范围查询数据
|
|
|
|
- *
|
|
|
|
- * @param deviceId 设备 ID
|
|
|
|
- * @param roads 路数
|
|
|
|
- * @param startTime 查询开始时间
|
|
|
|
- * @param endTime 查询结束时间
|
|
|
|
- * @return 查询到的传感器数据列表
|
|
|
|
- */
|
|
|
|
- public List<SensorData> queryDataByDeviceIdAndRoads(String deviceId, Integer roads, String startTime, String endTime) {
|
|
|
|
- String query = String.format("SELECT * FROM \"%s\" WHERE \"device_id\" = '%s' AND \"roads\" = %d AND time >= '%s' AND time <= '%s' ORDER BY time DESC", getMeasurement(SensorData.class), deviceId, roads, startTime, endTime);
|
|
|
|
- QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
|
|
- List<FluxTable> results = queryApi.query(query);
|
|
|
|
-
|
|
|
|
- // 将查询结果转换为 SensorData 实体
|
|
|
|
- return results.stream().flatMap(table -> table.getRecords().stream()).map(SensorData::mapToSensorData).collect(Collectors.toList());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 获取桶名(通过配置类获取或返回默认桶名)
|
|
|
|
- *
|
|
|
|
- * @return 桶名
|
|
|
|
- */
|
|
|
|
- public String getBucketName() {
|
|
|
|
- return jfcloudInfluxDB2Properties.getBucket() == null ? JfcloudColdChainConstants.INFLUXDB_DEFAULT_BUCKET_NAME : jfcloudInfluxDB2Properties.getBucket();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
-}
|
|
|
|
|
|
+// QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
|
|
+// List<FluxTable> tables = queryApi.query(fluxQuery);
|
|
|
|
+//
|
|
|
|
+// // 将查询结果映射为 SensorData 实体
|
|
|
|
+// return tables.stream()
|
|
|
|
+// .flatMap(table -> table.getRecords().stream()) // 拉平所有记录
|
|
|
|
+// .map(fluxRecord -> mapRecordToEntity(fluxRecord,getEntityClass())) // 将每个记录映射为 SensorData 实体
|
|
|
|
+// .findFirst() // 只取第一条记录(即最新一条记录)
|
|
|
|
+// .orElse(null); // 如果没有数据,返回 null
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+// /**
|
|
|
|
+// * 根据 deviceId、roads 和时间范围查询数据
|
|
|
|
+// *
|
|
|
|
+// * @param deviceId 设备 ID
|
|
|
|
+// * @param roads 路数
|
|
|
|
+// * @param startTime 查询开始时间
|
|
|
|
+// * @param endTime 查询结束时间
|
|
|
|
+// * @return 查询到的传感器数据列表
|
|
|
|
+// */
|
|
|
|
+// public List<SensorData> queryDataByDeviceIdAndRoads(String deviceId, Integer roads, String startTime, String endTime) {
|
|
|
|
+// String query = String.format("SELECT * FROM \"%s\" WHERE \"device_id\" = '%s' AND \"roads\" = %d AND time >= '%s' AND time <= '%s' ORDER BY time DESC", getMeasurement(SensorData.class), deviceId, roads, startTime, endTime);
|
|
|
|
+// QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
|
|
+// List<FluxTable> results = queryApi.query(query);
|
|
|
|
+//
|
|
|
|
+// // 将查询结果转换为 SensorData 实体
|
|
|
|
+// return results.stream().flatMap(table -> table.getRecords().stream()).map(SensorData::mapToSensorData).collect(Collectors.toList());
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+//
|
|
|
|
+// /**
|
|
|
|
+// * 获取桶名(通过配置类获取或返回默认桶名)
|
|
|
|
+// *
|
|
|
|
+// * @return 桶名
|
|
|
|
+// */
|
|
|
|
+// public String getBucketName() {
|
|
|
|
+// return jfcloudInfluxDB2Properties.getBucket() == null ? JfcloudColdChainConstants.INFLUXDB_DEFAULT_BUCKET_NAME : jfcloudInfluxDB2Properties.getBucket();
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+//}
|