如何从 Kafka 消费的数据中取最大值存储?

2022-08-11 22:19:42 +08:00
 chi1st

从 Kafka 每分钟接收数千条消息,每条消息中有 1000 条记录信息,记录 record 内容为(ip, time ,value)

相同 ip ,time 的 record 会有多个不同值,且有可能是不同分区,不同消息中传过来的,消费到的时间也可能相隔的比较长

如何快速找到同一 ip ,同一时间的最大 value ?

1444 次点击
所在节点    Kafka
4 条回复
Jooooooooo
2022-08-11 22:29:29 +08:00
看你的寻找是什么逻辑.

有一个办法是, 先放进缓存里, 过一小段时间再去处理

缓存的 key 就是 time, 弄一个 set 结构, time -> [(ip1, value1), (ip2, value2)]

紧接着, 再搞一个定时任务, 捞出当前时间 - 5 分钟 (这得看 time 的粒度)所有记录, 分别计算 time 对应的 maxValue

最后, 用的时候直接取最后算的结果就行. 这里主要要考虑所需数据的时效和接收消息的延迟问题, 比如当前是 0 分钟, 你在第一分钟需要第 0 分钟的最大值, 但是第二分钟又来了一条第 0 分钟的数据怎么办?
chi1st
2022-08-11 22:49:33 +08:00
@Jooooooooo 感谢大佬回复,完全理解了我问题的痛点,我目前的思路和你说的基本一致, 目前我有存一份全量数据在 MongoDB 中,想着从这份数据中定时查询当前时间- n 分钟中的 max value ip, 因为数据量比较大,查询速度有待测试,可能放在 Redis 里查比较好。
最大值是为了推到监控中,所以时效性还是要求比较高的,这个 n 分钟具体数值目前也不太能确定。

在这之前我有考虑过将所有数据接收后都传到一个专门的处理程序里,对相同( ip ,time )的 value 进行取最大值的操作,这个程序对同一( ip ,time )的处理设置一个超时时间,超过两分钟就认为没有其他数据了,直接将最大值推送至监控。
如果这样做的话感觉写起来比较困难,需要对记录进行轮询(不知道有没有更好的处理手段),处理数据的时间有可能会很长反而更影响时效性
Jooooooooo
2022-08-11 22:58:48 +08:00
@chi1st 如果你是为了加工数据做后续监控的话, 分钟级别的粒度应该够了. 因为监控是为了告警后续再人工解决的话, 基本上时间瓶颈不会在监控这一两分钟上, 一般都是人响应慢或者处理太慢导致问题解决时间长.

可以考虑在第 1 分 15 秒的时候去处理第 0 到第 1 分钟的数据(留有数据入库的延迟). 加工完成之后就获得了第 0 分钟的数据(加工时间应该不会超过 10s), 那么正常情况下在第 1 分 30 秒的时候就具有第 0 分钟的数据, 可以从此时去做监控相关的内容(比如捞出前 5 分钟加工好的数据, 看波动之类的指标是否有异常). 有异常则发出告警. 监控的延迟可以认为是 2 分钟以内发现问题.
chi1st
2022-08-11 23:14:45 +08:00
@Jooooooooo 再次感谢,“监控是为了告警后续再人工解决”,我也是这么理解的,对于网络相关数据的监控确实也不需要非常实时,重要的是相对及时的告警,和使用趋势图。
如果要有完整的曲线图,那么还要保证消费数据的及时性,如果我现在消费的是半小时前的数据,那么肯定计算不到当前时间前几分钟的数据,那么告警和趋势图数据也都没了。。。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/872301

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX