|
@@ -4,17 +4,13 @@ import com.github.jfcloud.influxdb.config.JfcloudInfluxDB2Properties;
|
|
|
import com.github.jfcloud.influxdb.flux.AggregationWindow;
|
|
|
import com.github.jfcloud.influxdb.flux.JfcloudFluxDataService;
|
|
|
import com.github.jfcloud.influxdb.service.JfcloudInfluxDBService;
|
|
|
-import com.github.jfcloud.influxdb.util.InfluxDBDateFormatter;
|
|
|
+import com.github.jfcloud.influxdb.util.TimeUtils;
|
|
|
import com.influxdb.client.QueryApi;
|
|
|
import com.influxdb.query.FluxTable;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import org.springframework.util.Assert;
|
|
|
import vip.xiaonuo.coldchain.core.bean.influxdb.SensorData;
|
|
|
|
|
|
-import java.time.Instant;
|
|
|
-import java.time.LocalDateTime;
|
|
|
-import java.time.ZoneOffset;
|
|
|
-import java.time.format.DateTimeFormatter;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.stream.Collectors;
|
|
@@ -67,74 +63,54 @@ public class JfcloudSensorDataService extends JfcloudFluxDataService<SensorData>
|
|
|
* @param field temperature/humidity/co2
|
|
|
* @return 查询到的传感器数据列表
|
|
|
*/
|
|
|
-
|
|
|
-// public List<SensorData> queryDataByDeviceIdAndRoads(String deviceId, Integer roads, String startTimeStr, String endTimeStr,String field) {
|
|
|
-// Assert.notNull(deviceId, "deviceId cannot be null");
|
|
|
-// Assert.notNull(roads, "roads cannot be null");
|
|
|
-// Assert.notNull(startTimeStr, "startTime cannot be null");
|
|
|
-// Assert.notNull(endTimeStr, "endTime cannot be null");
|
|
|
-// Assert.notNull(field, "field cannot be null");
|
|
|
-// //替换起始时间和结束时间2024/09/09 这种格式为2024-09-09
|
|
|
-// startTimeStr = InfluxDBDateFormatter.replaceDateFormat(startTimeStr);
|
|
|
-// endTimeStr = InfluxDBDateFormatter.replaceDateFormat(endTimeStr);
|
|
|
-// // 如果只有日期部分,则手动补充时间部分(00:00:00)
|
|
|
-// if (startTimeStr.length() == 10) {
|
|
|
-// startTimeStr += " 00:00:00";
|
|
|
-// }
|
|
|
-// if (endTimeStr.length() == 10) {
|
|
|
-// endTimeStr += " 23:59:59";
|
|
|
-// }
|
|
|
-// String startTimeFormatted = convertToISO8601(startTimeStr);
|
|
|
-// String endTimeFormatted = convertToISO8601(endTimeStr);
|
|
|
-// String aggregationWindow = FluxAggregationUtils.determineAggregationWindow(startTimeFormatted, endTimeFormatted);
|
|
|
-// String measurement = "sensor_data";
|
|
|
-// Map<String, String> filters = Map.of(
|
|
|
-// "device_id", deviceId,
|
|
|
-// "roads", roads.toString()
|
|
|
-// );
|
|
|
-// String query = FluxQueryBuilder.buildRangeQuery(getBucketName(), startTimeFormatted, endTimeFormatted, measurement, field, filters, aggregationWindow);
|
|
|
-// QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
|
-// List<FluxTable> results = queryApi.query(query);
|
|
|
-// return results.stream().flatMap(table -> table.getRecords().stream()).map(fluxRecord -> mapRecordToEntity(fluxRecord, getEntityClass())) // 转换为 SensorData 实体
|
|
|
-// .collect(Collectors.toList());
|
|
|
-// }
|
|
|
public List<SensorData> queryDataByDeviceIdAndRoads(String deviceId, Integer roads, String startTimeStr, String endTimeStr, String field, AggregationWindow aggregationWindow) {
|
|
|
Assert.notNull(deviceId, "deviceId cannot be null");
|
|
|
Assert.notNull(roads, "roads cannot be null");
|
|
|
Assert.notNull(startTimeStr, "startTime cannot be null");
|
|
|
Assert.notNull(endTimeStr, "endTime cannot be null");
|
|
|
Assert.notNull(field, "field cannot be null");
|
|
|
-
|
|
|
- // 替换日期格式,确保格式一致
|
|
|
- startTimeStr = InfluxDBDateFormatter.replaceDateFormat(startTimeStr);
|
|
|
- endTimeStr = InfluxDBDateFormatter.replaceDateFormat(endTimeStr);
|
|
|
-
|
|
|
- // 如果只有日期部分,补充时间部分为00:00:00和23:59:59
|
|
|
- if (startTimeStr.length() == 10) {
|
|
|
- startTimeStr += " 00:00:00";
|
|
|
- }
|
|
|
- if (endTimeStr.length() == 10) {
|
|
|
- endTimeStr += " 23:59:59";
|
|
|
- }
|
|
|
- // 转换为ISO8601格式
|
|
|
- String startTimeFormatted = convertToISO8601(startTimeStr);
|
|
|
- String endTimeFormatted = convertToISO8601(endTimeStr);
|
|
|
- String aggregationWindowA = aggregationWindow.getCode(); //FluxAggregationUtils.determineAggregationWindow(startTimeFormatted, endTimeFormatted);
|
|
|
+ // 格式化日期并转换为ISO8601格式
|
|
|
+ String startTimeFormatted = convertToISO8601(formatStartTime(startTimeStr));
|
|
|
+ String endTimeFormatted = convertToISO8601(formatEndTime(endTimeStr));
|
|
|
+ // 获取聚合窗口的代码
|
|
|
+ String aggregationWindowCode = aggregationWindow != null ? aggregationWindow.getCode() : "default";
|
|
|
+ // 构建查询语句
|
|
|
String measurement = "sensor_data"; // 数据表名称
|
|
|
Map<String, String> filters = Map.of(
|
|
|
"device_id", deviceId,
|
|
|
"roads", roads.toString()
|
|
|
);
|
|
|
- // 构建查询语句
|
|
|
- String query = FluxQueryBuilder.buildRangeQuery(getBucketName(), startTimeFormatted, endTimeFormatted, measurement, field, filters, aggregationWindowA);
|
|
|
+ String query = FluxQueryBuilder.buildRangeQuery(getBucketName(), startTimeFormatted, endTimeFormatted, measurement, field, filters, aggregationWindowCode);
|
|
|
+ // 执行查询并处理结果
|
|
|
QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
|
List<FluxTable> results = queryApi.query(query);
|
|
|
+
|
|
|
return results.stream()
|
|
|
.flatMap(table -> table.getRecords().stream())
|
|
|
.map(fluxRecord -> mapRecordToEntity(fluxRecord, getEntityClass()))
|
|
|
.collect(Collectors.toList());
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 格式化开始时间,若只有日期部分则补充时间部分为 00:00:00
|
|
|
+ */
|
|
|
+ private String formatStartTime(String startTimeStr) {
|
|
|
+ if (startTimeStr.length() == 10) {
|
|
|
+ return startTimeStr + " 00:00:00";
|
|
|
+ }
|
|
|
+ return startTimeStr;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 格式化结束时间,若只有日期部分则补充时间部分为 23:59:59
|
|
|
+ */
|
|
|
+ private String formatEndTime(String endTimeStr) {
|
|
|
+ if (endTimeStr.length() == 10) {
|
|
|
+ return endTimeStr + " 23:59:59";
|
|
|
+ }
|
|
|
+ return endTimeStr;
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
/**
|
|
|
* Converts a time string in "yyyy-MM-dd HH:mm:ss" format to ISO 8601 format.
|
|
@@ -143,57 +119,15 @@ public class JfcloudSensorDataService extends JfcloudFluxDataService<SensorData>
|
|
|
* @return The ISO 8601 formatted time string.
|
|
|
*/
|
|
|
public static String convertToISO8601(String time) {
|
|
|
- // Define the input and output formats
|
|
|
- DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
- DateTimeFormatter outputFormatter = DateTimeFormatter.ISO_DATE_TIME;
|
|
|
- // Parse the input time string to LocalDateTime (in system default zone)
|
|
|
- LocalDateTime localDateTime = LocalDateTime.parse(time, inputFormatter);
|
|
|
- // Convert the LocalDateTime to Instant (UTC time zone)
|
|
|
- Instant instant = localDateTime.atZone(ZoneOffset.UTC).toInstant();
|
|
|
- // Format the Instant to ISO 8601
|
|
|
- return outputFormatter.format(instant.atZone(ZoneOffset.UTC));
|
|
|
+ return TimeUtils.uTCPlus8ToUTC(time);
|
|
|
+// DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
+// DateTimeFormatter outputFormatter = DateTimeFormatter.ISO_DATE_TIME;
|
|
|
+// // Parse the input time string to LocalDateTime (in system default zone)
|
|
|
+// LocalDateTime localDateTime = LocalDateTime.parse(time, inputFormatter);
|
|
|
+// // Convert the LocalDateTime to Instant (UTC time zone)
|
|
|
+// Instant instant = localDateTime.atZone(ZoneOffset.UTC).toInstant();
|
|
|
+// // Format the Instant to ISO 8601
|
|
|
+// return outputFormatter.format(instant.atZone(ZoneOffset.UTC));
|
|
|
}
|
|
|
|
|
|
-// /**
|
|
|
-// * 根据时间范围、设备ID、路数和 bucket 查询传感器数据,并筛选需要的字段
|
|
|
-// *
|
|
|
-// * @param startTime 起始时间
|
|
|
-// * @param endTime 结束时间
|
|
|
-// * @param deviceId 设备ID
|
|
|
-// * @param roads 路数
|
|
|
-// * @param fields 查询的字段列表,可以包含 "temperature", "humidity", "co2", "battery", "plugInStatus", etc.
|
|
|
-// * @return 查询到的传感器数据列表
|
|
|
-// */
|
|
|
-// public List<SensorData> queryDataByTimeAndDevice(String startTime, String endTime, String deviceId, Integer roads, List<String> fields) {
|
|
|
-// // 构建查询字段的条件
|
|
|
-// String fieldsToSelect = fields.isEmpty() ? "*" : String.join(",", fields);
|
|
|
-// // 构建Flux查询语句
|
|
|
-// String query = String.format("from(bucket: \"%s\") |> range(start: %s, stop: %s) " + "|> filter(fn: (r) => r._measurement == \"sensor_data\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%d\") " + "|> sort(columns: [\"_time\"], desc: true) " + "|> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\") " + "|> keep(columns: [\"_time\", %s])", getBucketName(), startTime, endTime, deviceId, roads, fieldsToSelect);
|
|
|
-//
|
|
|
-// // 查询InfluxDB并返回结果
|
|
|
-// QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
|
-// List<FluxTable> results = queryApi.query(query);
|
|
|
-//
|
|
|
-// // 将查询结果转换为 SensorData 实体
|
|
|
-// return results.stream().flatMap(table -> table.getRecords().stream()).map(fluxRecord -> mapRecordToEntity(fluxRecord, getEntityClass())) // 转换为 SensorData 实体
|
|
|
-// .collect(Collectors.toList()); // 返回多条数据
|
|
|
-// }
|
|
|
-//
|
|
|
-// /**
|
|
|
-// * 根据 deviceId、roads、时间范围和查询条件查询数据
|
|
|
-// *
|
|
|
-// * @param deviceId 设备 ID
|
|
|
-// * @param roads 路数
|
|
|
-// * @param startTime 查询开始时间
|
|
|
-// * @param endTime 查询结束时间
|
|
|
-// * @param queryConditions 查询条件列表
|
|
|
-// * @return 查询到的传感器数据列表
|
|
|
-// */
|
|
|
-// public List<SensorData> querySensorData(String deviceId, int roads, Instant startTime, Instant endTime, List<QueryCondition> queryConditions) {
|
|
|
-// String fluxQuery = buildFluxQuery(deviceId, roads, startTime, endTime, queryConditions, SensorData.class);
|
|
|
-// QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
|
-// List<FluxTable> tables = queryApi.query(fluxQuery);
|
|
|
-// // 将查询结果映射为 SensorData 实体
|
|
|
-// return tables.stream().flatMap(table -> table.getRecords().stream()).map(fluxRecord -> mapRecordToEntity(fluxRecord, getEntityClass())).collect(Collectors.toList());
|
|
|
-// }
|
|
|
}
|