V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
dongfuye1
V2EX  ›  推广

用 Go 轻松完成一个 SAGA 分布式事务,保姆级教程

  •  
  •   dongfuye1 · 2021-08-24 10:30:30 +08:00 · 1880 次点击
    这是一个创建于 1182 天前的主题,其中的信息可能已经有所发展或是发生改变。

    银行跨行转账业务是一个典型分布式事务场景,假设 A 需要跨行转账给 B,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的 ACID,只能够通过分布式事务来解决。

    分布式事务

    分布式事务在分布式环境下,为了满足可用性、性能与降级服务的需要,降低一致性与隔离性的要求,一方面遵循 BASE 理论:

    • 基本业务可用性( Basic Availability )
    • 柔性状态( Soft state )
    • 最终一致性( Eventual consistency )

    另一方面,分布式事务也部分遵循 ACID 规范:

    • 原子性:严格遵循
    • 一致性:事务完成后的一致性严格遵循;事务中的一致性可适当放宽
    • 隔离性:并行事务间不可影响;事务中间结果可见性允许安全放宽
    • 持久性:严格遵循

    SAGA

    Saga 是这一篇数据库论文SAGAS提到的一个分布式事务方案。其核心思想是将长事务拆分为多个本地短事务,由 Saga 事务协调器协调,如果各个本地事务成功完成那就正常完成,如果某个步骤失败,则根据相反顺序一次调用补偿操作。

    目前可用于 SAGA 的开源框架,主要为 Java 语言,其中以 seata 为代表。我们的例子采用 go 语言,使用的分布式事务框架为https://github.com/yedf/dtm,它对分布式事务的支持非常优雅。下面来详细讲解 SAGA 的组成:

    DTM 事务框架里,有 3 个角色,与经典的 XA 分布式事务一样:

    • AP/应用程序,发起全局事务,定义全局事务包含哪些事务分支
    • RM/资源管理器,负责分支事务各项资源的管理
    • TM/事务管理器,负责协调全局事务的正确执行,包括 SAGA 正向 /逆向操作的执行

    下面看一个成功完成的 SAGA 时序图,就很容易理解 SAGA 分布式事务:

    image.png

    SAGA 实践

    对于我们要进行的银行转账的例子,我们将在正向操作中,进行转入转出,在补偿操作中,做相反的调整。

    首先我们创建账户余额表:

    CREATE TABLE dtm_busi.`user_account` (
      `id` int(11) AUTO_INCREMENT PRIMARY KEY,
      `user_id` int(11) not NULL UNIQUE ,
      `balance` decimal(10,2) NOT NULL DEFAULT '0.00',
      `create_time` datetime DEFAULT now(),
      `update_time` datetime DEFAULT now()
    );
    
    

    我们先编写核心业务代码,调整用户的账户余额

    func qsAdjustBalance(uid int, amount int) (interface{}, error) {
    	_, err := dtmcli.SdbExec(sdbGet(), "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid)
    	return dtmcli.ResultSuccess, err
    }
    

    下面我们来编写具体的正向操作 /补偿操作的处理函数

    	app.POST(qsBusiAPI+"/TransIn", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
    		return qsAdjustBalance(2, 30)
    	}))
    	app.POST(qsBusiAPI+"/TransInCompensate", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
    		return qsAdjustBalance(2, -30)
    	}))
    	app.POST(qsBusiAPI+"/TransOut", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
    		return qsAdjustBalance(1, -30)
    	}))
    	app.POST(qsBusiAPI+"/TransOutCompensate", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
    		return qsAdjustBalance(1, 30)
    	}))
    

    到此各个子事务的处理函数已经 OK 了,然后是开启 SAGA 事务,进行分支调用

    	req := &gin.H{"amount": 30} // 微服务的载荷
    	// DtmServer 为 DTM 服务的地址
    	saga := dtmcli.NewSaga(DtmServer, dtmcli.MustGenGid(DtmServer)).
    		// 添加一个 TransOut 的子事务,正向操作为 url: qsBusi+"/TransOut", 逆向操作为 url: qsBusi+"/TransOutCompensate"
    		Add(qsBusi+"/TransOut", qsBusi+"/TransOutCompensate", req).
    		// 添加一个 TransIn 的子事务,正向操作为 url: qsBusi+"/TransOut", 逆向操作为 url: qsBusi+"/TransInCompensate"
    		Add(qsBusi+"/TransIn", qsBusi+"/TransInCompensate", req)
    	// 提交 saga 事务,dtm 会完成所有的子事务 /回滚所有的子事务
    	err := saga.Submit()
    
    

    至此,一个完整的 SAGA 分布式事务编写完成。

    如果您想要完整运行一个成功的示例,那么按照 yedf/dtm 项目的说明搭建好环境之后,通过下面命令运行 saga 的例子即可:

    go run app/main.go quick_start
    

    处理网络异常

    假设提交给 dtm 的事务中,调用转入操作时,出现短暂的故障怎么办?按照 SAGA 事务的协议,dtm 会重试未完成的操作,这时我们要如何处理?故障有可能是转入操作完成后出网络故障,也有可能是转入操作完成中出现机器宕机。如何处理才能够保障账户余额的调整是正确无问题的?

    DTM 提供了子事务屏障功能,保证多次重试,只会有一次成功提交。(子事务屏障不仅保证幂等,还能够解决空补偿等问题,详情参考分布式事务最经典的七种解决方案的子事务屏障环节)

    我们把处理函数调整为:

    func sagaBarrierAdjustBalance(sdb *sql.Tx, uid int, amount int) (interface{}, error) {
    	_, err := dtmcli.StxExec(sdb, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid)
    	return dtmcli.ResultSuccess, err
    
    }
    
    func sagaBarrierTransIn(c *gin.Context) (interface{}, error) {
    	return dtmcli.ThroughBarrierCall(sdbGet(), MustGetTrans(c), func(sdb *sql.Tx) (interface{}, error) {
    		return sagaBarrierAdjustBalance(sdb, 1, reqFrom(c).Amount)
    	})
    }
    
    func sagaBarrierTransInCompensate(c *gin.Context) (interface{}, error) {
    	return dtmcli.ThroughBarrierCall(sdbGet(), MustGetTrans(c), func(sdb *sql.Tx) (interface{}, error) {
    		return sagaBarrierAdjustBalance(sdb, 1, -reqFrom(c).Amount)
    	})
    }
    

    这里的 dtmcli.TroughBarrierCall 调用会使用子事务屏障技术,保证第三个参数里的回调函数仅被处理一次​

    您可以尝试多次调用这个 TransIn 服务,仅有一次余额调整。您可以运行以下命令,运行新的处理方式:

    go run app/main.go saga_barrier
    

    处理回滚

    假如银行将金额准备转入用户 2 时,发现用户 2 的账户异常,返回失败,会怎么样?我们调整处理函数,让转入操作返回失败

    func sagaBarrierTransIn(c *gin.Context) (interface{}, error) {
    	return dtmcli.ResultFailure, nil
    }
    

    我们给出事务失败交互的时序图

    image.png

    这里有一点,TransIn 的正向操作什么都没有做,就返回了失败,此时调用 TransIn 的补偿操作,会不会导致反向调整出错了呢?

    不用担心,前面的子事务屏障技术,能够保证 TransIn 的错误如果发生在提交之前,则补偿为空操作; TransIn 的错误如果发生在提交之后,则补偿操作会将数据提交一次。

    您可以将返回错误的 TransIn 改成:

    func sagaBarrierTransIn(c *gin.Context) (interface{}, error) {
    	dtmcli.ThroughBarrierCall(sdbGet(), MustGetTrans(c), func(sdb *sql.Tx) (interface{}, error) {
    		return sagaBarrierAdjustBalance(sdb, 1, 30)
    	})
    	return dtmcli.ResultFailure, nil
    }
    

    最后的结果余额依旧会是对的,原理可以参考:分布式事务最经典的七种解决方案的子事务屏障环节

    小结

    在这篇文章里,我们介绍了 SAGA 的理论知识,也通过一个例子,完整给出了编写一个 SAGA 事务的过程,涵盖了正常成功完成,异常情况,以及成功回滚的情况。相信读者通过这边文章,对 SAGA 已经有了深入的理解。

    文中使用的 dtm 是新开源的 Golang 分布式事务管理框架,功能强大,支持 TCC 、SAGA 、XA 、事务消息等事务模式,支持 Go 、python 、PHP 、node 、csharp 等语言的。同时提供了非常简单易用的接口。

    阅读完此篇干货,欢迎大家访问项目https://github.com/yedf/dtm,给颗星星支持!

    tinkerer
        1
    tinkerer  
       2021-08-24 11:06:57 +08:00
    starred + 已感谢
    cholerae
        2
    cholerae  
       2021-08-24 12:55:54 +08:00   ❤️ 1
    老哥,库是个好库,但是没必要这样宣传吧,就这一个 repo,你这已经发了几个帖子了?我点进去看了眼你的发帖记录,近 21 天已经发了 9 个帖子了。

    这是背了公司内部给定的 kpi 要求吗
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   5590 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 08:48 · PVG 16:48 · LAX 00:48 · JFK 03:48
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.