|
@@ -8,6 +8,7 @@ import com.influxdb.client.QueryApi;
|
|
import com.influxdb.query.FluxTable;
|
|
import com.influxdb.query.FluxTable;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
import vip.xiaonuo.coldchain.core.bean.influxdb.SensorData;
|
|
import vip.xiaonuo.coldchain.core.bean.influxdb.SensorData;
|
|
|
|
+import vip.xiaonuo.coldchain.core.util.DateFormatter;
|
|
|
|
|
|
import java.time.Duration;
|
|
import java.time.Duration;
|
|
import java.time.Instant;
|
|
import java.time.Instant;
|
|
@@ -70,8 +71,9 @@ public class JfcloudSensorDataService extends JfcloudFluxDataService<SensorData>
|
|
Assert.notNull(roads, "roads cannot be null");
|
|
Assert.notNull(roads, "roads cannot be null");
|
|
Assert.notNull(startTimeStr, "startTime cannot be null");
|
|
Assert.notNull(startTimeStr, "startTime cannot be null");
|
|
Assert.notNull(endTimeStr, "endTime cannot be null");
|
|
Assert.notNull(endTimeStr, "endTime cannot be null");
|
|
- startTimeStr=startTimeStr.trim();
|
|
|
|
- endTimeStr=endTimeStr.trim();
|
|
|
|
|
|
+ //替换起始时间和结束时间2024/09/09 这种格式为2024-09-09
|
|
|
|
+ startTimeStr = DateFormatter.replaceDateFormat(startTimeStr);
|
|
|
|
+ endTimeStr = DateFormatter.replaceDateFormat(endTimeStr);
|
|
// 如果只有日期部分,则手动补充时间部分(00:00:00)
|
|
// 如果只有日期部分,则手动补充时间部分(00:00:00)
|
|
if (startTimeStr.length() == 10) {
|
|
if (startTimeStr.length() == 10) {
|
|
startTimeStr += " 00:00:00";
|
|
startTimeStr += " 00:00:00";
|
|
@@ -104,26 +106,7 @@ public class JfcloudSensorDataService extends JfcloudFluxDataService<SensorData>
|
|
* @return The Flux query as a string.
|
|
* @return The Flux query as a string.
|
|
*/
|
|
*/
|
|
public static String buildRangeTimeFluxQuery(String bucketName, String deviceId, String roads, String startTime, String stopTime, String aggregationWindow) {
|
|
public static String buildRangeTimeFluxQuery(String bucketName, String deviceId, String roads, String startTime, String stopTime, String aggregationWindow) {
|
|
- return String.format(
|
|
|
|
- "temperature = from(bucket: \"%s\")\n" +
|
|
|
|
- " |> range(start: time(v: \"%s\"), stop: time(v: \"%s\"))\n" +
|
|
|
|
- " |> filter(fn: (r) => r[\"_measurement\"] == \"sensor_data\" and r[\"_field\"] == \"temperature\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\")\n" +
|
|
|
|
- " |> aggregateWindow(every: %s, fn: mean, createEmpty: false)\n" +
|
|
|
|
- "humidity = from(bucket: \"%s\")\n" +
|
|
|
|
- " |> range(start: time(v: \"%s\"), stop: time(v: \"%s\"))\n" +
|
|
|
|
- " |> filter(fn: (r) => r[\"_measurement\"] == \"sensor_data\" and r[\"_field\"] == \"humidity\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\")\n" +
|
|
|
|
- " |> aggregateWindow(every: %s, fn: mean, createEmpty: false)\n" +
|
|
|
|
- "co2 = from(bucket: \"%s\")\n" +
|
|
|
|
- " |> range(start: time(v: \"%s\"), stop: time(v: \"%s\"))\n" +
|
|
|
|
- " |> filter(fn: (r) => r[\"_measurement\"] == \"sensor_data\" and r[\"_field\"] == \"co2\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\")\n" +
|
|
|
|
- " |> aggregateWindow(every: %s, fn: mean, createEmpty: false)\n" +
|
|
|
|
- "union(tables: [temperature, humidity, co2])\n" +
|
|
|
|
- " |> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")\n" +
|
|
|
|
- " |> yield(name: \"latest_data\")",
|
|
|
|
- bucketName, startTime, stopTime, deviceId, roads, aggregationWindow,
|
|
|
|
- bucketName, startTime, stopTime, deviceId, roads, aggregationWindow,
|
|
|
|
- bucketName, startTime, stopTime, deviceId, roads, aggregationWindow
|
|
|
|
- );
|
|
|
|
|
|
+ return String.format("temperature = from(bucket: \"%s\")\n" + " |> range(start: time(v: \"%s\"), stop: time(v: \"%s\"))\n" + " |> filter(fn: (r) => r[\"_measurement\"] == \"sensor_data\" and r[\"_field\"] == \"temperature\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\")\n" + " |> aggregateWindow(every: %s, fn: mean, createEmpty: false)\n" + "humidity = from(bucket: \"%s\")\n" + " |> range(start: time(v: \"%s\"), stop: time(v: \"%s\"))\n" + " |> filter(fn: (r) => r[\"_measurement\"] == \"sensor_data\" and r[\"_field\"] == \"humidity\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\")\n" + " |> aggregateWindow(every: %s, fn: mean, createEmpty: false)\n" + "co2 = from(bucket: \"%s\")\n" + " |> range(start: time(v: \"%s\"), stop: time(v: \"%s\"))\n" + " |> filter(fn: (r) => r[\"_measurement\"] == \"sensor_data\" and r[\"_field\"] == \"co2\" and r[\"device_id\"] == \"%s\" and r[\"roads\"] == \"%s\")\n" + " |> aggregateWindow(every: %s, fn: mean, createEmpty: false)\n" + "union(tables: [temperature, humidity, co2])\n" + " |> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")\n" + " |> yield(name: \"latest_data\")", bucketName, startTime, stopTime, deviceId, roads, aggregationWindow, bucketName, startTime, stopTime, deviceId, roads, aggregationWindow, bucketName, startTime, stopTime, deviceId, roads, aggregationWindow);
|
|
}
|
|
}
|
|
|
|
|
|
public static String determineAggregationWindow(String startTime, String stopTime) {
|
|
public static String determineAggregationWindow(String startTime, String stopTime) {
|