//生产者
RabbitTemplate.ConfirmCallback confirmCallback =
new RabbitTemplate.ConfirmCallback() {
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String messageId = correlationData.getId();
if (ack) {
//如果 confirm 返回成功 则进行更新
System.err.println("收到 confirm 为 true");
brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date());
} else {
//失败则进行具体的后续操作:重试 或者补偿等手段
System.err.println("异常处理...");
}
}
};
//消费者
@RabbitHandler
public void onOrderMessage(
@Payload Order order,
Channel channel,
@Headers Map<String, Object> headers
) throws Exception {
System.err.println("订单 id" + order.getId());
Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
//手工确认签收 ,消费者消费完消息主动的告诉 mq 我已经消费完了,false 是不支持批量接受
//一般的工作中也是使用手动的 ack,消费完之后要确认
channel.basicAck(deliverTag, false);
}
在消费端未启动的情况下,生产者的 ack 也为 true,这是为什么
如何才能在 consumer 中按照业务处理后,返回给生产者是否成功消费
1
LeeSeoung 2019-09-10 14:45:07 +08:00
ConfirmCallback 只能确认消息是否发到 exchange
还有 ReturnCallback 能够确认消息是否投递到对应的 queue 中,但是怎么知道消费者是否成功消费这个,成功的时候就不知道了,失败的时候可以手动确认要求 requeue,我记得这个时候在 header 里是有个标记的,你可以查查。 这两个 callback 需要配置 publisher-confirms: true、publisher-returns: true |
2
hihipp 2019-09-10 14:55:13 +08:00
|
3
lolizeppelin 2019-09-10 14:57:49 +08:00
openstack 里的对于需要回复的消息的做法是
在发送消息包里带一个 dict 的 route key 用于回包 这个 route key 是唯一的, 一般在进程创建的时候生成一个随机 key 作为用户给别人回包的 route key,注意进程需要每个进程有独立的可 route key |
4
FuryLeeU 2019-09-10 15:06:40 +08:00
.
|
5
freebird1994 2019-09-10 16:32:06 +08:00
第二种我不知道和 RPC 有什么区别。把消息队列用成阻塞的有啥意义
|