rocketmq filter message 使用消息丢失问题

2020-10-18 13:13:03 +08:00
 RedBeanIce

linux rocketmq 4.7.1

rocketmq-spring-boot-starter 2.1.1

生产者代码

public class FilterProducerTag {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("192.168.42.131:9876");
        producer.start();

        String[] stringArr = {"TagA", "TagB", "TagC"};
        for (int i = 0; i < 6; i++) {
            Message message = new Message();
            message.setTopic("TopicTestFilter");
            // message.setTags(stringArr[i % stringArr.length]);
            message.setTags("TagA");
            message.setKeys("KEY" + i);
            message.setBody(("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

            // Set some properties.
            SendResult sendResult = producer.send(message);
            System.out.println("=========================================");
            System.out.println(sendResult);
            System.out.println(message);
            System.out.println("=========================================");
        }
        producer.shutdown();
    }
}

消费者代码

如下

public class FilterConsumerTag {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        consumer.setNamesrvAddr("192.168.42.131:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // only subsribe messages have property a, also a >=0 and a <= 3

        // consumer.subscribe("TopicTestFilter", MessageSelector.bySql("a between 0 and 2"));

        consumer.subscribe("TopicTestFilter", "TagA || TagB");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {
                for (MessageExt messageExt : messageExtList) {
                    System.out.println("========================================================");
                    System.out.println(messageExt);
                    System.out.println(new String(messageExt.getBody()));
                    System.out.println("========================================================");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("consumer");
    }
}

问题

我在消费者方只收到生产者的四条消息,按道理我应该收到 6 条,全部

日志

生产者日志

RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.
=========================================
SendResult [sendStatus=SEND_OK, msgId=64774C6B275C18B4AAC25A56EFC60000, offsetMsgId=C0A82A8300002A9F00000000000CF5C0, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=0], queueOffset=31]
Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY0, UNIQ_KEY=64774C6B275C18B4AAC25A56EFC60000, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}
=========================================
=========================================
SendResult [sendStatus=SEND_OK, msgId=64774C6B275C18B4AAC25A56EFEE0001, offsetMsgId=C0A82A8300002A9F00000000000CF68F, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=1], queueOffset=30]
Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY1, UNIQ_KEY=64774C6B275C18B4AAC25A56EFEE0001, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}
=========================================
=========================================
SendResult [sendStatus=SEND_OK, msgId=64774C6B275C18B4AAC25A56EFF80002, offsetMsgId=C0A82A8300002A9F00000000000CF75E, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=2], queueOffset=23]
Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY2, UNIQ_KEY=64774C6B275C18B4AAC25A56EFF80002, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='null'}
=========================================
=========================================
SendResult [sendStatus=SEND_OK, msgId=64774C6B275C18B4AAC25A56EFFB0003, offsetMsgId=C0A82A8300002A9F00000000000CF82D, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=3], queueOffset=22]
Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY3, UNIQ_KEY=64774C6B275C18B4AAC25A56EFFB0003, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 51], transactionId='null'}
=========================================
=========================================
SendResult [sendStatus=SEND_OK, msgId=64774C6B275C18B4AAC25A56EFFD0004, offsetMsgId=C0A82A8300002A9F00000000000CF8FC, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=0], queueOffset=32]
Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY4, UNIQ_KEY=64774C6B275C18B4AAC25A56EFFD0004, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='null'}
=========================================
=========================================
SendResult [sendStatus=SEND_OK, msgId=64774C6B275C18B4AAC25A56F0020005, offsetMsgId=C0A82A8300002A9F00000000000CF9CB, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=1], queueOffset=31]
Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY5, UNIQ_KEY=64774C6B275C18B4AAC25A56F0020005, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53], transactionId='null'}
=========================================
13:00:47.034 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.42.131:10911] result: true
13:00:47.036 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.42.131:9876] result: true

Process finished with exit code 0

消费者日志

========================================================
========================================================
MessageExt [brokerName=localhost.localdomain, queueId=0, storeSize=207, queueOffset=31, sysFlag=0, bornTimestamp=1602997246918, bornHost=/192.168.42.1:6700, storeTimestamp=1602997246973, storeHost=/192.168.42.131:10911, msgId=C0A82A8300002A9F00000000000CF5C0, commitLogOffset=849344, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTestFilter', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=33, KEYS=KEY0, CONSUME_START_TIME=1602997259086, UNIQ_KEY=64774C6B275C18B4AAC25A56EFC60000, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]
Hello RocketMQ 0
MessageExt [brokerName=localhost.localdomain, queueId=0, storeSize=207, queueOffset=32, sysFlag=0, bornTimestamp=1602997246973, bornHost=/192.168.42.1:6700, storeTimestamp=1602997247002, storeHost=/192.168.42.131:10911, msgId=C0A82A8300002A9F00000000000CF8FC, commitLogOffset=850172, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTestFilter', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=33, KEYS=KEY4, CONSUME_START_TIME=1602997259086, UNIQ_KEY=64774C6B275C18B4AAC25A56EFFD0004, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='null'}]
========================================================
========================================================
========================================================
Hello RocketMQ 4
========================================================
MessageExt [brokerName=localhost.localdomain, queueId=1, storeSize=207, queueOffset=31, sysFlag=0, bornTimestamp=1602997246978, bornHost=/192.168.42.1:6700, storeTimestamp=1602997247011, storeHost=/192.168.42.131:10911, msgId=C0A82A8300002A9F00000000000CF9CB, commitLogOffset=850379, bodyCRC=1424393152, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTestFilter', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=32, KEYS=KEY5, CONSUME_START_TIME=1602997259086, UNIQ_KEY=64774C6B275C18B4AAC25A56F0020005, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53], transactionId='null'}]
MessageExt [brokerName=localhost.localdomain, queueId=1, storeSize=207, queueOffset=30, sysFlag=0, bornTimestamp=1602997246958, bornHost=/192.168.42.1:6700, storeTimestamp=1602997246992, storeHost=/192.168.42.131:10911, msgId=C0A82A8300002A9F00000000000CF68F, commitLogOffset=849551, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTestFilter', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=32, KEYS=KEY1, CONSUME_START_TIME=1602997259086, UNIQ_KEY=64774C6B275C18B4AAC25A56EFEE0001, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}]
Hello RocketMQ 5
========================================================
Hello RocketMQ 1
========================================================
1207 次点击
所在节点    Java
0 条回复

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/716086

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX