请问 InfluxDB 如何返回筛选后的多个 series 的平均值 series

2023-01-11 14:08:43 +08:00
 dragonszy

之前一直用 pandas 做数据处理,刚接触时序数据库 Influxdb 2.6 ,有个地方不太明白,想请教一下。

假设 tag 是 lat 和 lon ,组合有(32,119),(33,119),(32,120),(33,120),即32<=lat<=33, 119<=lon<=120

假设 field 有温度 temperature,风速 wind_speed 等字段

时间戳范围是 2021-01-01 到 2021-12-31

我想返回这些经纬度 temperature 和 wind_speed 的平均值时间序列,类似下面这些数据,时间和所需的 field 的均值。

time,temperature,wind_speed
2021-01-01,10.2,5.2
2021-01-02,11.1,4.2
.....
2021-12-30,5.6,4.1
2021-12-31,7.6,3.6

还有一个补充问题就是能否像 pandas agg 一样,对于不同列(field)做不同统计,比如 temperature 求 mean ,wind_speed 求 max 之类的。

请问应该如何写 InfluxQL 或者 Flux 语句,求一下大神指导。

965 次点击
所在节点    问与答
5 条回复
sc104501
2023-01-11 14:52:44 +08:00
用最新的 influxdb 的话建议用 flux 。
首先取温度,做 mean ,结果保存到 a1 。
然后取风速,做 max ,保存到 a2 。
最后调用 flux 的 join.inner(),基于“时序+tag”组合两个结果输出就可以了。

如果找到了更合适的方法告诉我一声,手动狗头。

另外,经纬度除非可能性特别少,不然还是作为 field 比较合适,max-values-per-tag 根据官方的建议不建议超过 100000 。
dragonszy
2023-01-11 15:53:49 +08:00
@sc104501 感谢回复,“首先取温度,做 mean ,结果保存到 a1”就是这块不太会弄,之前这么写的,返回值就不是时间序列了,而是每个经纬度的平均值。

```
query = f"""
from(bucket: "xny_data")
|> range(start: {start_timestamp}, stop: {end_timestamp})
|> filter(fn: (r) => r["_measurement"] == "weatherData")
|> filter(fn: (r) => r["lat"] >= "20")
|> filter(fn: (r) => r["lon"] >= "119")
|> filter(fn: (r) => r["_field"] == "cloudcover")
|> mean()
"""
tables = query_api.query(query, org="xny")
for table in tables:
for record in table.records:
print(record)
```
返回值
```
[<FluxTable: 9 columns, 1 records>, <FluxTable: 9 columns, 1 records>]
```

```
FluxRecord() table: 0, {'result': '_result', 'table': 0, '_start': datetime.datetime(2020, 12, 31, 16, 0, tzinfo=tzutc()), '_stop': datetime.datetime(2021, 12, 31, 16, 0, tzinfo=tzutc()), '_field': 'cloudcover', '_measurement': 'weatherData', 'lat': '32', 'lon': '119', '_value': 43.14566210045662} FluxRecord() table: 1, {'result': '_result', 'table': 1, '_start': datetime.datetime(2020, 12, 31, 16, 0, tzinfo=tzutc()), '_stop': datetime.datetime(2021, 12, 31, 16, 0, tzinfo=tzutc()), '_field': 'cloudcover', '_measurement': 'weatherData', 'lat': '34', 'lon': '120', '_value': 39.43207762557078}
```
sc104501
2023-01-12 10:03:44 +08:00
@dragonszy 每天一个数据应该是用 aggregateWindow 。按照一个时间窗口汇总数据。
https://docs.influxdata.com/flux/v0.x/stdlib/universe/aggregatewindow/

代码没有测试过,可能不对。
from(bucket: "xny_data")
|> range(start: {start_timestamp}, stop: {end_timestamp})
|> filter(fn: (r) => r["_measurement"] == "weatherData")
|> filter(fn: (r) => r["lat"] >= "20")
|> filter(fn: (r) => r["lon"] >= "119")
|> filter(fn: (r) => r["_field"] == "cloudcover")
|> aggregateWindow(every: 1d, fn: mean)
dragonszy
2023-01-13 15:58:41 +08:00
@sc104501 非常感谢!但是目前还是有个问题,我想得到一个平均值时间序列(不分经纬度),现在采用 aggregateWindow 还是每个经纬度返回一个序列,想知道一下最终得到的是一个根据经纬度平均的序列该怎么做。

其实意思就是,我有 10 个时间序列,怎么求平均,得到 1 个时间序列。
sc104501
2023-01-16 10:15:43 +08:00
@dragonszy 意思是返回的 tables 含有多个表(每个表一个经纬度组合)需要合并成一个坐标范围内的平均值吧。

如果是数据点数量加权,在 aggregateWindow() 前加上 group() 取消分组。
- 原始数据每个数据的权重相同。
如果是“经纬度组合后的时间序列”加权,在 aggregateWindow() 后加上 group() 再用 aggregateWindow() 求一个平均值。
- 范围内的每个坐标点(采集器?)不管产生的数据多少,权重相同。

关于 group(),group 完全不带参数就是 ungroup 的功能。
或者直接用 group 按一个其他指定的 tag 分组,应该也可以避免现在的经纬度分组。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/908138

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX