之前一直用 pandas 做数据处理,刚接触时序数据库 Influxdb 2.6 ,有个地方不太明白,想请教一下。
假设 tag 是 lat 和 lon ,组合有(32,119),(33,119),(32,120),(33,120)
,即32<=lat<=33, 119<=lon<=120
假设 field 有温度 temperature,风速 wind_speed 等字段
时间戳范围是 2021-01-01 到 2021-12-31
我想返回这些经纬度 temperature 和 wind_speed 的平均值时间序列,类似下面这些数据,时间和所需的 field 的均值。
time,temperature,wind_speed
2021-01-01,10.2,5.2
2021-01-02,11.1,4.2
.....
2021-12-30,5.6,4.1
2021-12-31,7.6,3.6
还有一个补充问题就是能否像 pandas agg 一样,对于不同列(field)做不同统计,比如 temperature 求 mean ,wind_speed 求 max 之类的。
请问应该如何写 InfluxQL 或者 Flux 语句,求一下大神指导。
1
sc104501 2023-01-11 14:52:44 +08:00
用最新的 influxdb 的话建议用 flux 。
首先取温度,做 mean ,结果保存到 a1 。 然后取风速,做 max ,保存到 a2 。 最后调用 flux 的 join.inner(),基于“时序+tag”组合两个结果输出就可以了。 如果找到了更合适的方法告诉我一声,手动狗头。 另外,经纬度除非可能性特别少,不然还是作为 field 比较合适,max-values-per-tag 根据官方的建议不建议超过 100000 。 |
2
dragonszy OP @sc104501 感谢回复,“首先取温度,做 mean ,结果保存到 a1”就是这块不太会弄,之前这么写的,返回值就不是时间序列了,而是每个经纬度的平均值。
``` query = f""" from(bucket: "xny_data") |> range(start: {start_timestamp}, stop: {end_timestamp}) |> filter(fn: (r) => r["_measurement"] == "weatherData") |> filter(fn: (r) => r["lat"] >= "20") |> filter(fn: (r) => r["lon"] >= "119") |> filter(fn: (r) => r["_field"] == "cloudcover") |> mean() """ tables = query_api.query(query, org="xny") for table in tables: for record in table.records: print(record) ``` 返回值 ``` [<FluxTable: 9 columns, 1 records>, <FluxTable: 9 columns, 1 records>] ``` ``` FluxRecord() table: 0, {'result': '_result', 'table': 0, '_start': datetime.datetime(2020, 12, 31, 16, 0, tzinfo=tzutc()), '_stop': datetime.datetime(2021, 12, 31, 16, 0, tzinfo=tzutc()), '_field': 'cloudcover', '_measurement': 'weatherData', 'lat': '32', 'lon': '119', '_value': 43.14566210045662} FluxRecord() table: 1, {'result': '_result', 'table': 1, '_start': datetime.datetime(2020, 12, 31, 16, 0, tzinfo=tzutc()), '_stop': datetime.datetime(2021, 12, 31, 16, 0, tzinfo=tzutc()), '_field': 'cloudcover', '_measurement': 'weatherData', 'lat': '34', 'lon': '120', '_value': 39.43207762557078} ``` |
3
sc104501 2023-01-12 10:03:44 +08:00
@dragonszy 每天一个数据应该是用 aggregateWindow 。按照一个时间窗口汇总数据。
https://docs.influxdata.com/flux/v0.x/stdlib/universe/aggregatewindow/ 代码没有测试过,可能不对。 from(bucket: "xny_data") |> range(start: {start_timestamp}, stop: {end_timestamp}) |> filter(fn: (r) => r["_measurement"] == "weatherData") |> filter(fn: (r) => r["lat"] >= "20") |> filter(fn: (r) => r["lon"] >= "119") |> filter(fn: (r) => r["_field"] == "cloudcover") |> aggregateWindow(every: 1d, fn: mean) |
4
dragonszy OP @sc104501 非常感谢!但是目前还是有个问题,我想得到一个平均值时间序列(不分经纬度),现在采用 aggregateWindow 还是每个经纬度返回一个序列,想知道一下最终得到的是一个根据经纬度平均的序列该怎么做。
其实意思就是,我有 10 个时间序列,怎么求平均,得到 1 个时间序列。 |
5
sc104501 2023-01-16 10:15:43 +08:00
@dragonszy 意思是返回的 tables 含有多个表(每个表一个经纬度组合)需要合并成一个坐标范围内的平均值吧。
如果是数据点数量加权,在 aggregateWindow() 前加上 group() 取消分组。 - 原始数据每个数据的权重相同。 如果是“经纬度组合后的时间序列”加权,在 aggregateWindow() 后加上 group() 再用 aggregateWindow() 求一个平均值。 - 范围内的每个坐标点(采集器?)不管产生的数据多少,权重相同。 关于 group(),group 完全不带参数就是 ungroup 的功能。 或者直接用 group 按一个其他指定的 tag 分组,应该也可以避免现在的经纬度分组。 |