[Kafka] 求助, 同一个服务如何组播消费 kafka 某个 topic 的消息呢?

2020-08-21 09:20:30 +08:00
 BBCCBB

我现在用的是启动的时候动态生成 groupId, 比如 name + uuid 的方式

但是这样重启后就会导致原来的 consumerGroup 对应的实例都被销毁了.但 kafka 里依然存在原来的 consumerGroup, 监控上看已经被销毁的 consumerGroup 也会发现堆积越来越严重, 有谁知道正确的使用姿势吗??

不胜感激

😢

3112 次点击
所在节点    Kafka
31 条回复
useben
2020-08-21 09:43:14 +08:00
生成唯一 groupId, 存到文件, 启动时读文件, 有就用原来的, 没有再生成写到文件...
BBCCBB
2020-08-21 09:53:11 +08:00
@useben 这个在固定机器上是可以的, 但我们这里 docker 镜像每次都不知道到哪个机器上了. 😢
SingeeKing
2020-08-21 09:54:10 +08:00
环境变量?
hustmisa
2020-08-21 09:55:02 +08:00
首先 consumer 的使用业务需求是什么,新启动的 groupId 对启动期间的数据是否可以丢弃?
如果可以丢弃 kafka 配置 retention 很短就可以了,这样不会堆积;如果不能丢弃,配置 retention 长一些这样重启换 groupId 也能续接数据(大概是你现在的方式),但是就不要频繁换 groupId 了啊。我是这么理解的不知道对不对
BBCCBB
2020-08-21 10:08:29 +08:00
@SingeeKing 都是同一个服务, 不同实例,这个不方便给每个实例加环境变量, 哈哈

@hustmisa 可以丢弃, 因为应用上有 ack 超时重试机制, 要命的是重启后老的 groupId 不会自动心跳超时消失, 会在监控上看到消息不断堆积. 其实我想实现的就是 rocketmq 的广播的功能.. 但我们使用 kafka.
mosesyou
2020-08-21 10:11:18 +08:00
为什么用唯一或者固定的 groupId 不行
BBCCBB
2020-08-21 10:19:32 +08:00
@mosesyou 这个业务场景需要同一个服务里的不同实例全都消费到每个 MQ
zardly666
2020-08-21 10:28:17 +08:00
用 redis 做一个类似选 ID 的东西,服务启动份数等于 ID 份数。

for (int i = 0; i < 启动份数; i++) {
if (redisUtil.setnx( + i, “lockthing”,time )) {
bucketConfig.setConsumeZsetBucketNum(i);
log.info("此实例的消费者为" + i);
break;
}
}

服务启动的时候,第一个服务拿到 consumerId+1 ;
第二个服务拿到 consumerId+2 ;
这样,就复用几个了吧。
zardly666
2020-08-21 10:30:00 +08:00
代码没删干净,大概意思就是服务启动动态去拿自己所属的 consumer
wisej
2020-08-21 10:30:58 +08:00
另一种思路,服务同一个 groupid,分发由服务自己来做(拿到服务其它实例的 ip )

另外旧 cg 堆积会有什么负面影响么?除了消息会冗余地保存,直到 retention 设置的时间被清除
sonice
2020-08-21 10:40:54 +08:00
想多了,consumerGroup 堆积能有多少,起停一次多一个,也不会有很多啊。这也不会导致 zk 性能降低啊
amwyyyy
2020-08-21 11:04:23 +08:00
原来 consumerGroup 的堆积只是个数字,消息数据只有一份,不管你有几个 consumerGroup 。过期的 consumerGroup 会被清理掉。
kifile
2020-08-21 11:05:34 +08:00
我的理解,题主的意思是因为 ConsumerGroup 的 GroupId 每次重启会重新生成一个新的,导致监控面板上出现了废弃的 groupId 的 Lag 不断增大的现象。

如果重启时 Consumer 的 offset 没有什么意义,那就在重启新应用前,删除老的 ConsumerGroup,做一个这种策略不就好了?
BBCCBB
2020-08-21 11:13:58 +08:00
@zardly666 这个倒是可以做, 类似 snowflake 算法 workid 的生成. 但相对较麻烦, 老哥但还有没有简单点的解决办法啊 😿
@wisej consumergroup 堆积就是监控上看着有点慌. 磁盘会占用.
@sonice 额, 我们多个实例发布的时候重启, 那就会一次性有多个 old consumerGroup, 监控上看着蛇皮的很, 比较难搞.
@kifile 是, 这是一种方案, 但开发没有 KafkaAdmin 的权限, 所以代码里删除不掉, 只能手动了.... 哈哈


谢谢各位, 期待更好的方案 🐶
mosesyou
2020-08-21 11:23:21 +08:00
纯 docker 么,如果是 k8s 的话,用 statefulset,可以实现每个实例有固定递增编码 0,1,2....
BBCCBB
2020-08-21 11:32:54 +08:00
@mosesyou 我们将 docker 镜像上传到云上, 然后后续的流程我得研究一下, 问一下我们负责这一块的同事, 如果可行的话这得确是一个好办法. 多谢.
yangbonis
2020-08-21 11:35:21 +08:00
mq 不是本来就组播工作的?所有订阅都会收到。
BBCCBB
2020-08-21 11:37:57 +08:00
@yangbonis 是需要同一个服务不同实例都收到, 你说的这个大概是不同服务.
j2gg0s
2020-08-21 12:39:55 +08:00
@BBCCBB 瞎逼设计,每个实例根据消息在自己的内存里面做些什么工作吗?不能搞个 redis 或者 db ?

然后,kafka 的监控看到堆积是没有什么大影响的,因为消息只存一份。
如果你觉得不爽,可以在实例 shutdown 的时候了,把 consumergroup 注销掉?
j2gg0s
2020-08-21 12:40:34 +08:00
@j2gg0s 每次重启,重头还是重新开始消费呢?

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/700133

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX