RabbitMQ 增加多消费者导致生产者推送消息阻塞(10s~30s) 设备上传数据到系统 A(netty),系统 A 根据数据类型推送消息到不同的队列,因为设备量增多的原因,之前单消费者开始处理的不及时,就想着多增加个消费者(和之前的消费者代码一样),然后系统 A 推送消息开始出现卡顿,数据帧应答的很慢,感觉不像是流控的事,管理端看着也没问题 相关代码: 系统 A:
channelRead(ChannelHandlerContext ctx, Object msg){
....
sendAck(ctx,ack);
switch (data.getClass().getName()) {
case "realTimeData":
RabbitUtil.getInstance().publish(realTimeData);
}
}
publish(RealTimeData realTimeData){
.......
Map<String, Object> header = new HashMap<String, Object>();
header.put("DataType", "RealTimeData");
BasicProperties props = new BasicProperties().builder().headers(header).build();
channel.basicPublish(exchangeName, routeKey_CollectedData, props, CollectedRealTimeDataPackageTransform.toBytes(data));
}
channel init:
private Channel channel;
private ConnectionFactory factory = new ConnectionFactory();
@PostConstruct
public void init() {
instance = this;
factory.setUsername(mqUserName);
factory.setPassword(mqPassword);
factory.setHost(mqHost);
factory.setVirtualHost(mqVirtualHost);
factory.setPort(mqPort);
}
channel = factory.newConnection().createChannel();
}
消费者代码:
@Autowired
DataProcessor processor;
@Autowired
@Qualifier("threadpool")
ThreadPoolExecutor threadPool;
@RabbitListener(queues = "${mq.queue.Original.CollectedData}", ackMode = "MANUAL")
public void process(Message msg, Channel channel) {
MessageProperties mp = msg.getMessageProperties();
Map<String, Object> headers = mp.getHeaders();
String dataType = (String) headers.get("DataType");
switch (dataType) {
case "RealTimeData":
CompletableFuture.runAsync(() -> {
try {
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
CollectedRealTimeData crtd = CollectedRealTimeDataPackageTransform.fromBytes(msg.getBody());
processor.process(crtd);
} catch (Exception e) {
try {
channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);
} catch (IOException ioException) {
ioException.printStackTrace();
}
e.printStackTrace();
}
}, threadPool);
break;
}
}
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.