|
@@ -5,6 +5,8 @@ import com.github.jfcloud.influxdb.flux.QueryCondition;
|
|
|
import com.github.jfcloud.influxdb.service.JfcloudInfluxDBService;
|
|
|
import com.influxdb.annotations.Column;
|
|
|
import com.influxdb.annotations.Measurement;
|
|
|
+import com.influxdb.client.QueryApi;
|
|
|
+import com.influxdb.query.FluxTable;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
import vip.xiaonuo.coldchain.core.config.JfcloudColdChainConstants;
|
|
@@ -80,6 +82,67 @@ public class SensorDataService {
|
|
|
return fluxQuery.toString();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 根据 deviceId 和 roads 查询最新的数据
|
|
|
+ *
|
|
|
+ * @param deviceId 设备 ID
|
|
|
+ * @param roads 路数
|
|
|
+ * @return 最新的传感器数据
|
|
|
+ */
|
|
|
+ public SensorData queryLatestDataByDeviceIdAndRoads(String deviceId, Integer roads) {
|
|
|
+ // 构建 Flux 查询,获取最新一条记录
|
|
|
+// String fluxQuery = String.format(
|
|
|
+// "from(bucket: \"%s\") " +
|
|
|
+// "|> range(start: -30d) " + // 查询过去 30 天的数据
|
|
|
+// "|> filter(fn: (r) => r._measurement == \"%s\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\") " +
|
|
|
+// "|> sort(columns: [\"_time\"], desc: true) " + // 按时间降序排序
|
|
|
+// "|> limit(n: 1)", // 限制查询结果为 1 条
|
|
|
+// getBucketName(), // 动态获取桶名
|
|
|
+// getMeasurement(SensorData.class), // 动态获取 measurement 名称
|
|
|
+// deviceId, // 设备 ID
|
|
|
+// roads // 路数
|
|
|
+// );
|
|
|
+ String fluxQuery = String.format(
|
|
|
+ "from(bucket: \"%s\") " +
|
|
|
+ "|> range(start: -30d) " +
|
|
|
+ "|> filter(fn: (r) => r._measurement == \"%s\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\") " +
|
|
|
+ "|> sort(columns: [\"_time\"], desc: true) " +
|
|
|
+ "|> limit(n: 1) " +
|
|
|
+ "|> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\") " +
|
|
|
+ "|> keep(columns: [\"_time\", \"temperature\", \"humidity\", \"co2\", \"battery\", \"plugInStatus\", \"location\", \"longitude\", \"latitude\", \"device_id\", \"roads\", \"modelName\"])",
|
|
|
+ getBucketName(),
|
|
|
+ getMeasurement(SensorData.class),
|
|
|
+ deviceId,
|
|
|
+ roads
|
|
|
+ );
|
|
|
+ QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
|
+ List<FluxTable> tables = queryApi.query(fluxQuery);
|
|
|
+
|
|
|
+ // 将查询结果映射为 SensorData 实体
|
|
|
+ return tables.stream()
|
|
|
+ .flatMap(table -> table.getRecords().stream()) // 拉平所有记录
|
|
|
+ .map(SensorData::mapToSensorData) // 将每个记录映射为 SensorData 实体
|
|
|
+ .findFirst() // 只取第一条记录(即最新一条记录)
|
|
|
+ .orElse(null); // 如果没有数据,返回 null
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据 deviceId、roads 和时间范围查询数据
|
|
|
+ *
|
|
|
+ * @param deviceId 设备 ID
|
|
|
+ * @param roads 路数
|
|
|
+ * @param startTime 查询开始时间
|
|
|
+ * @param endTime 查询结束时间
|
|
|
+ * @return 查询到的传感器数据列表
|
|
|
+ */
|
|
|
+ public List<SensorData> queryDataByDeviceIdAndRoads(String deviceId, Integer roads, String startTime, String endTime) {
|
|
|
+ String query = String.format("SELECT * FROM \"%s\" WHERE \"device_id\" = '%s' AND \"roads\" = %d AND time >= '%s' AND time <= '%s' ORDER BY time DESC", getMeasurement(SensorData.class), deviceId, roads, startTime, endTime);
|
|
|
+ QueryApi queryApi = jfcloudInfluxDBService.getInfluxDBClient().getQueryApi();
|
|
|
+ List<FluxTable> results = queryApi.query(query);
|
|
|
+
|
|
|
+ // 将查询结果转换为 SensorData 实体
|
|
|
+ return results.stream().flatMap(table -> table.getRecords().stream()).map(SensorData::mapToSensorData).collect(Collectors.toList());
|
|
|
+ }
|
|
|
|
|
|
|
|
|
/**
|