疑问:事务消息和事务中提交消息的差异

2021-05-27 21:53:42 +08:00
 SilenceLL

请教一下,基于 MQ 的事务消息比如 RocketMQ 的事务消息和在事务中投递消息,如果投递消息失败则回滚的差异在哪里了。

新手上路,多多包涵。

1539 次点击
所在节点    程序员
10 条回复
zhgg0
2021-05-27 22:51:59 +08:00
在事务中投递消息,如果投递成功,接下来的事务失败咋回滚消息?
yeqizhang
2021-05-28 00:58:55 +08:00
无论如何都是要等本地事务执行成功后再尝试提交消息(事务消息在本地事务执行前只是半提交),如果提交消息的代码穿插在本地事务中间,就会出现一楼说的消息提交后本地事务又失败了怎么办。提交消息成功没有回滚这个说法,事务消息那个回滚是因为消息中间间支持了半提交的消息。

如果你用普通消息,本地事务执行完后最后发送消息失败,需要重试发送,或者对已经执行成功的本地事务进行处理。事务消息那是另外一种做法
fkname
2021-05-28 09:36:08 +08:00
事务消息是确保你发送的消息肯定在发送方是成功执行的,而在事务中发送消息消息会立刻发送出去,事务无法限制消息的发送。
SilenceLL
2021-05-28 09:46:09 +08:00
感谢各位热心解答 @zhgg0 @yeqizhang @fkname.

如果我把发送 half 消息放在一个事物中,比如下单的事物,确保发送 half 消息成功。
然后在 TransactionListener 的 executeLocalTransaction 方法中直接返回 LocalTransactionState.COMMIT,
在 checkLocalTransaction 中检查下单的事物是否已经提交成功。这种方式有什么问题吗?
SilenceLL
2021-05-28 10:08:42 +08:00
是基于失败的成本考虑吗?如果先发 half 消息失败成本低。如果先提交事务再发送 half 消息导致事务回滚成本高。
yeqizhang
2021-05-29 18:16:18 +08:00
@SilenceLL 你 4 楼说的不行,不应该将发送消息的代码放入到数据库事务中,否则在重试发送消息的过程中过多的重试会拉长数据库事务执行时间。

至于 5 楼说的和发普通消息没区别了,具体事务消息对此有何优点,我也没找到,网上很少有说回滚本地事务操作这块的内容的
yeqizhang
2021-05-29 23:27:48 +08:00
忽略我之前说的哈,是我之前学习时没搞明白并且也忘了。现在在这里补充一下我今天看的,当作在这做个笔记,如果打扰到楼主,感到抱歉~

首先我把标题改一下,楼主问的也挺好的,就是问 事务消息这种形式和在本地事务中耦合发送普通消息的差异。

在本地事务中耦合发送普通消息时,就是楼主一开始问的“在事务中投递消息,如果投递消息失败则回滚”,这个是没有问题的,最多是可能需要尝试发几次,本地事务很好回滚,这个没啥问题。 而“事务消息”,如果预发送和本地事务都执行成功了,那提交消息这一步到最后一定需要执行成功,没有回滚这一说。

而在本地事务中投递消息,会有什么问题呢,以下摘自 https://blog.csdn.net/zxjoke/article/details/105260252

// 开始事务
try {
// 1.执行数据库操作
// 2.发送 mq 消息
// 3.提交事务
}catch (Exception e){
// 4.回滚事务
}

上面代码看起来确实没什么问题,消息发送失败,回滚事务。
但是实际上第二步有可能存在消息已经发送到 MQ 服务端,但是由于网络问题未及时收到 MQ 的响应消息,从而导致消息发送端认为消息消息发送失败。
这就会导致订单事务回滚了,但是手续费系统却能消费消息,两边数据库又不一致了。
熟悉 MQ 的同学,可能会想到,消息发送失败,可以重试啊。
是的,我们可以增加重试次数,重新发送消息。但是这里我们需要注意,由于消息发送耦合在事务中,过多的重试会拉长数据库事务执行时间,事务处理时间过长,导致事务中锁的持有时间变长,影响整体的数据库吞吐量。
实际业务中,不太建议将消息发送耦合在数据库事务中。
SilenceLL
2021-05-30 11:32:44 +08:00
比如这样子:

public class Test {
@Transactional
public void createOrder() {
execCreateOrder();
sendHalfMessage();
commitCreateOrder();
}

@RocketMQTransactionListener
static class OrderTransactionListenerImpl implements RocketMQLocalTransactionListener {

@Autowired
private OrderService orderService;

@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
log.info("----executeLocalTransaction----{}", msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID, String.class));
return RocketMQLocalTransactionState.COMMIT;
}

@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
log.info("----checkLocalTransaction----{}", msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID, String.class));
String keys = msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID, String.class);
Order order = orderService.query(orderId);
if (null == order) {
return RocketMQLocalTransactionState.UNKNOWN;
}
return RocketMQLocalTransactionState.COMMIT;
}
}
}
SilenceLL
2021-05-30 11:36:25 +08:00
SilenceLL
2021-05-30 11:39:36 +08:00
@SilenceLL 看起来跟楼上的朋友说的一样,如果 mq 异常,会导致事务处理过程持有锁的时间边长,影响数据库吞吐量。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/779670

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX