@
wangxin3 代码逻辑大概是这样,考虑以下方法再多实例运行,消费组是同一个,消费者名称是写死
@
Override public void run(ApplicationArguments args) throws Exception {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, ChatGroupUserDTO>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.batchSize(10)
.executor(executor)
.pollTimeout(Duration.ofSeconds(5))
.targetType(ChatGroupUserDTO.class)
.build();
StreamMessageListenerContainer<String, ObjectRecord<String, ChatGroupUserDTO>> container =
StreamMessageListenerContainer.create(redisConnectionFactory, options);
prepareChannelAndGroup(redisTemplate.opsForStream(), MESSAGE_STREAM, MESSAGE_GROUP);
container.receive(Consumer.from(MESSAGE_GROUP, "consumer-1"),
StreamOffset.create(MESSAGE_STREAM, ReadOffset.lastConsumed()),
messageListener);
this.container = container;
// 启动监听
this.container.start();
logger.info("{}启动成功",MESSAGE_STREAM);
}