springboot 里面异步处理消费到的 kafka 消息,如何保证不丢消息呢

2023-10-17 20:23:52 +08:00
 NoKey
在 springboot 里面写 kafka 消费代码
为了增加消费速度,写了一个线程池,用来异步处理消费的消息
那么,请教一下各位大佬
如果,异步处理的时候,出现问题了,怎么重复消费这条消息呢?
处理和消费是分开的,处理这里出问题了,消费那里无感知
请各位大佬指点,谢谢
2971 次点击
所在节点    Java
30 条回复
rainbowStay
2023-10-18 14:21:29 +08:00
@totoro52 #3 提个问题,如果不手动触发确认,MQ 是会阻塞还是会继续处理后面的其他消息?
zhhmax
2023-10-18 14:48:22 +08:00
@rainbowStay 不用重发了,因为消息已经发成功了,有 ID 就能通过其他方式拿到完整消息的,要是消息再发一遍就重复了,这里和消费已经没关系了。楼主的异步处理消息的方式已经决定了不能再有重发消息的步骤,不然处理消息的那部分逻辑还得加上消息是否是处理失败的消息,增加代码复杂度。
w4n9hu1
2023-10-18 14:57:48 +08:00
加个 retry queue 和 dlq? 之前总结过一篇文章 https://w4n9hu1.dev/2023/05/26/exception-handling-in-event-driven-architectures/
rainbowStay
2023-10-18 14:59:15 +08:00
@zhhmax #22 根据 ID 能怎么拿到完整消息啊。。。那我理解要么就上游保留了关联 id 的消息信息,要么就是楼主在消费信息时自己保存一遍关联 id 的消息信息,这样不是更加大了复杂性吗?
zhhmax
2023-10-18 15:24:34 +08:00
@rainbowStay 我们可以讨论更严谨一点,这个 kafka 如果连接的是两个不同的系统,那么你说的无法通过 ID 得到完整消息确实是个问题,保存 ID 的时候可以做到把这条失败的完整消息再保存到其他地方而不用再考虑重发问题。如果是上游的消息生产者也是自己内部系统,只能在消息队列中才能得知完整的消息内容而无法通过 ID 再从其他途径得到同样的内容在我个人看来是属于重大的设计缺陷,即便是这样,那也可以换个方式,新建一个队列把失败的消息放进去让消费原来消息队列的任务也监听一下这个队列就可以了,那么新的问题又来了,这样做到重发了,但是如果任务一直处理失败会不会放大数据量引发其他问题,比如说某段时间某批数据永远无法处理成功而一直重发会不会影响到其他批次正常消息的处理效率问题。
liprais
2023-10-18 15:28:31 +08:00
消费成功了再去更新 offset,不成功就重试呗
你不更新 offset 永远读的是这一条就完了
rainbowStay
2023-10-18 16:23:06 +08:00
给楼主找了下 Kafka 的文档: https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html ,直接搜”consume a batch of records“定位到关键部分
fkdog
2023-10-18 16:32:27 +08:00
假设前提是,你的 kafka topic 允许乱序消费。
1. 一次从 kafka 里取 n 条数据,比如 100 条。
2. 然后 100 条丢异步处理。手动 ack 从第一条开始消息到第一条失败消息区间里最大的那条 offset 作为 ack 提交。比如 1 ~ 33 是成功的,34 是失败的,那么 ack 33.
3. 消费端做幂等处理。防止消息重复消费。
4. 对于重复发生错误的消息,做 retry n 次还是失败的话,单独记录人工介入。
NoKey
2023-10-18 20:47:51 +08:00
@fkdog 感谢,感觉这说的很全面了,但是,这个依然是阻塞式的,就是这 100 条会阻塞着处理完成后,才可能进行下一组,然后,异步处理 100 条,找到成功处理的 offset ,这个也是要等到这 100 个全部完成了才知道,中途是不确定的。可能还是楼上说的,要异步处理可能需要手动解决失败的信息,放重试队列或者放数据库里。
fkdog
2023-10-19 10:26:58 +08:00
@NoKey
所以你说的“阻塞式”是有什么问题吗?
不想丢消息,你还是要用到 ack offset 。
只要你是在本地开了线程池异步去消费消息,因为线程池也是乱序的,那么你还是需要记录每条消息的处理结果,然后合并计算连续成功区间内的最后一条 offset 。

感觉你的想法是来一条就自动 offset 一条,然后本地保存这条记录。
那还不如直接任务存数据库+定时轮训扫描。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/982892

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX