问题最近想试试 kafka 的相关操作,在 spring-kafka 3.0.1 版本下。发现当我指定消费者去消费指定分区时,kafka 无法检测到该消费者(消费组内无消费者),但是却可以正常消费。
如果不指定,而是由 kafka 自动平衡消费者的话,就可以检测到
// 该代码正常
@KafkaListener(id = "consumer-all", clientIdPrefix = "consumer-all", topics = "topic1", groupId = "mygroup", concurrency = "1")
public void listenAllOne(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
System.out.println("这是第一个消费者-----"+consumerRecord.toString());
ack.acknowledge();
}
// 该消费者可以正常消费,但是无法被 kafka 检测到
@KafkaListener(id="consumer-1", clientIdPrefix = "consumer-1", topicPartitions = {@TopicPartition(topic = "topic2", partitionOffsets = @PartitionOffset(partition = "0,1", initialOffset = "0"))}, groupId = "mygroup1", concurrency = "1")
public void listenTop1(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
System.out.println("这是第一个分区消费者-----"+consumerRecord.toString());
ack.acknowledge();
}
@KafkaListener(id="consumer-2", clientIdPrefix = "consumer-2", topicPartitions = {@TopicPartition(topic = "topic2", partitionOffsets = @PartitionOffset(partition = "2,3", initialOffset = "0"))}, groupId = "mygroup1", concurrency = "1")
public void listenTop2(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
System.out.println("这是第二个分区消费者-----"+consumerRecord.toString());
ack.acknowledge();
}
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.