问题大概描述是:
邮件发送,消费者数量是 5-20,有时候会阻塞(问题还不清楚)导致消费者无法继续处理队列中的消息
我的处理方式是重启 tomcat,重启果然是万能的,重启后,就继续读取消息了。
但不可能天天守着看然后重启一下吧
于是乎,我就搜了相关的 ActiveMQ 的文章 https://blog.csdn.net/ma15732625261/article/details/81267963 里面讲了 SlowConsumerStrategy:慢速消费者策略,但是我配置了,无效果
<policyEntry queue=">" producerFlowControl="true" memoryLimit="512mb">
<slowConsumerStrategy>
<abortSlowConsumerStrategy abortConnection="false"/>
</slowConsumerStrategy>
</policyEntry>
我用了下 RocketMQ,也遇到了类似的问题,consumeTimeout 也没效果
我的理解是:配置了 consumeTimeout,超时之后,就处理下一个消息
package cn.msb.rocketmq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RocketMQMessageListener(topic = "test-topic-1",
consumerGroup = "my-consumer_test-topic-1",
selectorExpression = "first",
selectorType = SelectorType.TAG,
consumeThreadMax = 1, consumeTimeout = 1000)
public class MyConsumer1 implements RocketMQListener<String> {
public void onMessage(String message) {
if(message.contains("1")) {
try {
System.out.println("1 阻塞中。。。");
Thread.sleep(1000*60*60);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("received message: {}", message);
}
}
我想达到的效果是:消费者处理超时后就终止执行,让给下个消息进行处理
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.