springboot 里面异步处理消费到的 kafka 消息,如何保证不丢消息呢

253 天前
 NoKey
在 springboot 里面写 kafka 消费代码
为了增加消费速度,写了一个线程池,用来异步处理消费的消息
那么,请教一下各位大佬
如果,异步处理的时候,出现问题了,怎么重复消费这条消息呢?
处理和消费是分开的,处理这里出问题了,消费那里无感知
请各位大佬指点,谢谢
2705 次点击
所在节点    Java
30 条回复
maocat
253 天前
消息 ID ,加日志
totoro52
253 天前
消费确认
totoro52
253 天前
改为手动确认模式,当消费者消费完成后手动触发确认,MQ 才会删除这条消息。
hidemyself
253 天前
后处理呗,存本地表,定时任务扫描
oneisall8955
253 天前
手动 ACK
tramm
252 天前
那就不要异步呗...
手动提交的话,那就得等结果出来,跟同步处理没啥区别吧
xiaofan2
252 天前
很明显你是没有开启手动提交 offset?
正常来说,你消息扔到线程池后应该返回一个 Future ,然后你需要等待你这个消费处理完成,再给 broker 返回提交成功
apisces
252 天前
有个疑问,如果手动 ack 的话,那也需要等到线程池的 Future 返回才提交,这样和同步的区别在哪里呢
NoKey
252 天前
@xiaofan2 这样是不是无法实现不断的处理消息的目的,总是要等待这一批消息处理完成,才可以继续处理下一批?
WashFreshFresh
252 天前
@NoKey 处理完后手动 ack 比较好 没有 ack 的消息是会重发的
lx0319
252 天前
去重机制。 或者设计容忍重复的方式,用于重复消费时,保持幂等
可以处理后记录 offset 和 partition ,真出现问题,从故障 offset 重取就好 kafka 重置 offset 也可以,另开一个 group ,单独消费这条数据也行。
venomD
252 天前
1. 弄个死信队列,处理异常的直接丢到死信队列里
2. 丢到数据库里,mysql ,redis ?定时任务扫描处理,成功后删除
fengpan567
252 天前
线程池异步处理有问题,消息如果在线程池的队列中,重启服务的时候数据不就丢了。还不如写到中间表,定时去捞未处理数据
lsk569937453
252 天前
@fengpan567 重启服务的时候内存的数据是没有了,但是因为消息没有 commit ,所以可以从 mq 再次消费没有 commit 的消息。
4kingRAS
252 天前
@apisces 不阻塞 kafka 线程啊,同步的话 kafka 线程会阻塞到你提交为止
julyclyde
252 天前
消费和处理分开,这设计是不是有点问题啊?

是不是应该改成各线程分别消费并就地处理?
jfds
252 天前
不用线程池消息会堆积么,干嘛要增加消费速度。MQ 已经是异步的了,一般对 rt 不敏感,为了这一点速度搞异步链路引出新的问题不值得。
zhhmax
252 天前
办法很多,比如说消息 ID 放到 redis ,处理成功就去更新一下 redis 这个消息的自动过期时间等自动过期就行,一直没过期的就是处理失败的,如何确定一直失败呢,消费一次计数器加一即可。这样也不影响其他线程消费队列导致阻塞。失败到一定次数还可以加入其他逻辑人工干预。
Luckyshot
252 天前
消息表+定时任务
rainbowStay
252 天前
@zhhmax 即使消息 ID 放到 redis ,那消费失败了消息耶没法重发啊?

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/982892

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX