|
@@ -0,0 +1,110 @@
|
|
|
+package vip.xiaonuo.coldchain.core.service;
|
|
|
+
|
|
|
+import com.github.jfcloud.influxdb.config.JfcloudInfluxDB2Properties;
|
|
|
+import com.github.jfcloud.influxdb.flux.JfcloudFluxDataService;
|
|
|
+import com.github.jfcloud.influxdb.flux.QueryCondition;
|
|
|
+import com.github.jfcloud.influxdb.service.JfcloudInfluxDBService;
|
|
|
+import com.influxdb.client.QueryApi;
|
|
|
+import com.influxdb.query.FluxTable;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+import vip.xiaonuo.coldchain.core.bean.influxdb.SensorData;
|
|
|
+
|
|
|
+import java.time.Instant;
|
|
|
+import java.util.List;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author jackzhou
|
|
|
+ * @version 1.0
|
|
|
+ * @project jfcloud-coldchain
|
|
|
+ * @description
|
|
|
+ * @date 2024/11/25 12:35:02
|
|
|
+ */
|
|
|
+@Service
|
|
|
+public class JfcloudSensorDataService extends JfcloudFluxDataService<SensorData> {
|
|
|
+ public JfcloudSensorDataService(JfcloudInfluxDBService jfcloudInfluxDBService, JfcloudInfluxDB2Properties jfcloudInfluxDB2Properties) {
|
|
|
+ super(jfcloudInfluxDBService, jfcloudInfluxDB2Properties);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据 deviceId 和 roads 查询最新的数据
|
|
|
+ *
|
|
|
+ * @param deviceId 设备 ID
|
|
|
+ * @param roads 路数
|
|
|
+ * @return 最新的传感器数据
|
|
|
+ */
|
|
|
+ public SensorData queryLatestDataByDeviceIdAndRoads(String deviceId, Integer roads) {
|
|
|
+ // 构建 Flux 查询,获取最新一条记录
|
|
|
+// String fluxQuery = String.format(
|
|
|
+// "from(bucket: \"%s\") " +
|
|
|
+// "|> range(start: -30d) " + // 查询过去 30 天的数据
|
|
|
+// "|> filter(fn: (r) => r._measurement == \"%s\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\") " +
|
|
|
+// "|> sort(columns: [\"_time\"], desc: true) " + // 按时间降序排序
|
|
|
+// "|> limit(n: 1)", // 限制查询结果为 1 条
|
|
|
+// getBucketName(), // 动态获取桶名
|
|
|
+// getMeasurement(SensorData.class), // 动态获取 measurement 名称
|
|
|
+// deviceId, // 设备 ID
|
|
|
+// roads // 路数
|
|
|
+// );
|
|
|
+ String fluxQuery = String.format(
|
|
|
+ "from(bucket: \"%s\") " +
|
|
|
+ "|> range(start: -30d) " +
|
|
|
+ "|> filter(fn: (r) => r._measurement == \"%s\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\") " +
|
|
|
+ "|> sort(columns: [\"_time\"], desc: true) " +
|
|
|
+ "|> limit(n: 1) " +
|
|
|
+ "|> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\") " +
|
|
|
+ "|> keep(columns: [\"_time\", \"temperature\", \"humidity\", \"co2\", \"battery\", \"plugInStatus\", \"location\", \"longitude\", \"latitude\", \"device_id\", \"roads\", \"modelName\"])",
|
|
|
+ getBucketName(),
|
|
|
+ getMeasurement(SensorData.class),
|
|
|
+ deviceId,
|
|
|
+ roads
|
|
|
+ );
|
|
|
+ QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
|
+ List<FluxTable> tables = queryApi.query(fluxQuery);
|
|
|
+
|
|
|
+ // 将查询结果映射为 SensorData 实体
|
|
|
+ return tables.stream()
|
|
|
+ .flatMap(table -> table.getRecords().stream()) // 拉平所有记录
|
|
|
+ .map(SensorData::mapToSensorData) // 将每个记录映射为 SensorData 实体
|
|
|
+ .findFirst() // 只取第一条记录(即最新一条记录)
|
|
|
+ .orElse(null); // 如果没有数据,返回 null
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据 deviceId、roads 和时间范围查询数据
|
|
|
+ *
|
|
|
+ * @param deviceId 设备 ID
|
|
|
+ * @param roads 路数
|
|
|
+ * @param startTime 查询开始时间
|
|
|
+ * @param endTime 查询结束时间
|
|
|
+ * @return 查询到的传感器数据列表
|
|
|
+ */
|
|
|
+ public List<SensorData> queryDataByDeviceIdAndRoads(String deviceId, Integer roads, String startTime, String endTime) {
|
|
|
+ String query = String.format("SELECT * FROM \"%s\" WHERE \"device_id\" = '%s' AND \"roads\" = %d AND time >= '%s' AND time <= '%s' ORDER BY time DESC", getMeasurement(SensorData.class), deviceId, roads, startTime, endTime);
|
|
|
+ QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
|
+ List<FluxTable> results = queryApi.query(query);
|
|
|
+ // 将查询结果转换为 SensorData 实体
|
|
|
+ return results.stream().flatMap(table -> table.getRecords().stream()).map(SensorData::mapToSensorData).collect(Collectors.toList());
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据 deviceId、roads、时间范围和查询条件查询数据
|
|
|
+ *
|
|
|
+ * @param deviceId 设备 ID
|
|
|
+ * @param roads 路数
|
|
|
+ * @param startTime 查询开始时间
|
|
|
+ * @param endTime 查询结束时间
|
|
|
+ * @param queryConditions 查询条件列表
|
|
|
+ * @return 查询到的传感器数据列表
|
|
|
+ */
|
|
|
+ public List<SensorData> querySensorData(String deviceId, int roads, Instant startTime, Instant endTime, List<QueryCondition> queryConditions) {
|
|
|
+ String fluxQuery = buildFluxQuery(deviceId, roads, startTime, endTime, queryConditions, SensorData.class);
|
|
|
+ QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
|
+ List<FluxTable> tables = queryApi.query(fluxQuery);
|
|
|
+ // 将查询结果映射为 SensorData 实体
|
|
|
+ return tables.stream().flatMap(table -> table.getRecords().stream()).map(SensorData::mapToSensorData).collect(Collectors.toList());
|
|
|
+ }
|
|
|
+}
|