原创
业务需要,批量消费,但是又想批量直接按 List<ModelDTO> 模式直接拉取数据,批量处理。
private final String topic = "queue_notify";
@KafkaListener(topics = topic ,containerFactory = "kafkaLiveListenerContainerFactory")
public void listen(List
private final String topic = "queue_push";
//containerFactory 容器工厂方法
@KafkaListener(topics = topic,containerFactory = "kafkaListenerContainerFactory")
public void listen(List
批量拉取不同 topic 获取到 list 的 DTO,进行处理。
package com.dg.mall.push.config;
import com.dg.mall.push.kafka.PushJsonDeserializer; import com.dg.mall.push.kafka.LiveJsonDeserializer; import com.dg.mall.push.kafka.listen.LiveNotifyListener; import com.dg.mall.push.kafka.listen.PushListener; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap; import java.util.Map;
@Configuration @EnableKafka public class KafkaConsumerConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String servers;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka.consumer.auto-commit-interval}")
private String autoCommitInterval;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.concurrency}")
private int concurrency;
@Value("${spring.kafka.consumer.max-consumer-number}")
private Integer maxConsumerNumber;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, byte[]>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//并发数量
factory.setConcurrency(concurrency);
//批量获取
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, byte[]> consumerFactory() {
return new DefaultKafkaConsumerFactory(consumerConfigs(),new StringDeserializer(),new pushJsonDeserializer());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// //这里是反序列化的 pushJsonDeserializer
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, pushJsonDeserializer.class);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,40000);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
//最多批量获取 100 个
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxConsumerNumber);
return propsMap;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, byte[]>> kafkaLiveListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(liveConsumerFactory());
//并发数量
factory.setConcurrency(concurrency);
//批量获取
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, byte[]> liveConsumerFactory() {
return new DefaultKafkaConsumerFactory(liveConsumerConfigs(),new StringDeserializer(),new PushLiveJsonDeserializer());
}
public Map<String, Object> liveConsumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//这里是反序列化的 liveJsonDeserializer
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, liveJsonDeserializer.class);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,40000);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
//最多批量获取 100 个
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxConsumerNumber);
return propsMap;
}
@Bean
public PushListener listener() {
return new PushListener();
}
@Bean
public NotifyListener livelistener() {
return new NotifyListener();
}
} //反序列化 package com.dg.mall.push.kafka;
import com.dg.mall.push.model.message.PushLiveDTO; import com.gexin.fastjson.JSON; import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
public class LiveJsonDeserializer implements Deserializer<PushLiveDTO> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public PushLiveDTO deserialize(String topic, byte[] data) {
return JSON.parseObject(data, PushLiveDTO.class);
}
@Override
public void close() {
}
} //反序列化 package com.dg.mall.push.kafka;
import com.dg.mall.push.model.message.MallPushMongoDB; import com.gexin.fastjson.JSON; import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
public class MallPushJsonSerializer implements Serializer<MallPushMongoDB> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, MallPushMongoDB data) {
return JSON.toJSONBytes(data);
}
@Override
public void close() {
}
}
总结 消费
@KafkaListener(topics = topic ,containerFactory = "kafkaLiveListenerContainerFactory") 监听消费一定要加 containerFactory 对应 容器工厂类 ,容器工厂类里面有个反序列化,需要替换,一般都是 String 反序列化,这里我们替换成我们自己创建的 DTO,在来进行反序列化 LiveJsonDeserializer 。
最后的最后监听消费批量获取的时候
数据就这样全部获取到了。
end!
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.