做网站用什么程序,网络推广优化方案,宁波其它区高端关键词优化,steam交易链接怎么用DTM分布式事务 从内网看到了关于事务在业务中的讨论#xff0c;评论区大佬有提及DTM开源项目[https://dtm.pub/]#xff0c;开学开学 基础理论
一、Why DTM 项目产生于实际生产中的问题#xff0c;涉及订单支付的服务会将所有业务相关逻辑放到一个大的本地事务#xff…DTM分布式事务 从内网看到了关于事务在业务中的讨论评论区大佬有提及DTM开源项目[https://dtm.pub/]开学开学 基础理论
一、Why DTM 项目产生于实际生产中的问题涉及订单支付的服务会将所有业务相关逻辑放到一个大的本地事务导致大量耦合复杂度大幅提升 java成熟的分布式事务解决方案使用代价过高大量业务用java重写 DTMDistributed Transaction Manager 其是一个分布式事务管理器解决跨数据库、跨服务、跨语言更新数据的一致性问题。
DTM提供了Saga、TCC、XA和二阶段消息模式以满足不同应用场景的需求同时首创的子事务屏障技术有效解决幂等、悬挂和空补偿等异常问题。
DTM的优点
提供简单易用的接口拆分具体业务接入分布式事务支持多语言栈核心技术子事务屏蔽降低处理子事务乱的难度
二、 快速上手
1、Demo
了解DTM的发展和特点quick start一下8⃣️
// 运行dtm
git clone https://github.com/dtm-labs/dtm cd dtm
go run main.go
// 运行一个saga示例
go run qs/main.go 上述Saga示例实现一个类似跨行转账的功能包括两个事务分支资金转出TransOut、资金转入TransIn。DTM保证TransIn和TransOut要么全成功要么全回滚保证最终金额的正确性。 // 具体业务微服务地址const qsBusi http://localhost:8081/api/busi_sagareq : gin.H{amount: 30} // 微服务的载荷// DtmServer为DTM服务的地址是一个urlDtmServer : http://localhost:36789/api/dtmsvrsaga : dtmcli.NewSaga(DtmServer, shortuuid.New()).// 添加一个TransOut的子事务正向操作为url: qsBusi/TransOut 补偿操作为url: qsBusi/TransOutCompensateAdd(qsBusi/TransOut, qsBusi/TransOutCompensate, req).// 添加一个TransIn的子事务正向操作为url: qsBusi/TransIn 补偿操作为url: qsBusi/TransInCompensateAdd(qsBusi/TransIn, qsBusi/TransInCompensate, req)// 提交saga事务dtm会完成所有的子事务/回滚所有的子事务err : saga.Submit()2、时序图 从以上时序图可以看出DTM整个全局事务分为如下几步
用户定义好全局事务所有的事务分支全局事务的组成部分称为事务分支然后提交给DTMDTM持久化全局事务信息后立即返回DTM取出第一个事务分支这里是TransOut调用该服务并成功返回DTM取出第二个事务分支这里是TransIn调用该服务并成功返回DTM已完成所有的事务分支将全局事务的状态修改为已完成
失败情况
在实际业务中子事务可以出现失败例如转入的子账号被冻结导致转账失败。因此可以对业务代码进行修改来模拟TransIn正向操作失败
func qsAddRoute(app *gin.Engine) {app.POST(qsBusiAPI/TransIn, func(c *gin.Context) {log.Printf(TransIn)c.JSON(200, )// c.JSON(409, ) // Status 409 for Failure. Wont be retried})app.POST(qsBusiAPI/TransInCompensate, func(c *gin.Context) {log.Printf(TransInCompensate)c.JSON(200, )})app.POST(qsBusiAPI/TransOut, func(c *gin.Context) {log.Printf(TransOut)c.JSON(200, )})app.POST(qsBusiAPI/TransOutCompensate, func(c *gin.Context) {log.Printf(TransOutCompensate)c.JSON(200, )})
} 再次运行整个事务最终失败时序图 在转入操作失败的情况下TransIn和TransOut的补偿操作被执行保证了最终的余额和转账前是一样的 三、二阶段消息Demo 业务场景 跨行转账是典型的分布式事务场景在这里A需要跨行转账给B 假设需求场景 只有转出A可能失败转入B是能够最终成功的 二阶段消息是DTM首创的事务模式用于替换本地事务表和事务消息这两种现有的方案
二阶段消息能够保证本地事务的提交和全局事务的提交是原子性的适合解决不需要回滚的分布式事务场景
1、核心代码
// SagaAdjustBalance 1
func SagaAdjustBalance(db dtmcli.DB, uid int, amount int, result string) error {if strings.Contains(result, dtmcli.ResultFailure) {return dtmcli.ErrFailure}_, err : dtmimp.DBExec(BusiConf.Driver, db, update dtm_busi.user_account set balance balance ? where user_id ?, amount, uid)return err
}调整用户的账号余额
app.POST(BusiAPI/SagaBTransIn, dtmutil.WrapHandler(func(c *gin.Context) interface{} {barrier : MustBarrierFromGin(c)return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {return SagaAdjustBalance(tx, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult)})}))barrier.Call主要用于处理幂等保证重复调用不会多次调整余额
开启事务进行分支调用 gid : dtmimp.GetFuncName()req : busi.GenReqHTTP(30, false, false)msg : dtmcli.NewMsg(DtmServer, gid).Add(busi.Busi/SagaBTransIn, req)err : msg.DoAndSubmitDB(Busi/QueryPreparedB, dbGet().ToSQLDB(), func(tx *sql.Tx) error {return busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, SUCCESS)})该代码保证了DoAndSubmitDB的业务提交和全局事务提交是原子性的保证TransOut和TransIn同时成功or失败。 其中DoAndSubmitDB第一个参数为回查URL app.GET(BusiAPI/QueryPrepared, dtmutil.WrapHandler(func(c *gin.Context) interface{} {logger.Debugf(%s QueryPrepared, c.Query(gid))return string2DtmError(dtmimp.OrString(MainSwitch.QueryPreparedResult.Fetch(), dtmcli.ResultSuccess))}))app.GET(BusiAPI/QueryPreparedB, dtmutil.WrapHandler(func(c *gin.Context) interface{} {logger.Debugf(%s QueryPreparedB, c.Query(gid))bb : MustBarrierFromGin(c)db : dbGet().ToSQLDB()return bb.QueryPrepared(db)}))2、原子性 四、SAGA型事务Demo 业务场景 A需要跨行转账给B 假设场景 转出A和转入B都可能成功和失败需要最终转入转出都成功or失败 核心思想
将长事务拆分为多个本地短事务有Saga事务协调器来协调如果各个本地事务成功完成则正常完成如果某个步骤失败则根据相反顺序依次调用补偿操作。
1、核心代码
调整用户的账户余额
func SagaAdjustBalance(db dtmcli.DB, uid int, amount int, result string) error {if strings.Contains(result, dtmcli.ResultFailure) {return dtmcli.ErrFailure}_, err : dtmimp.DBExec(BusiConf.Driver, db, update dtm_busi.user_account set balance balance ? where user_id ?, amount, uid)return err
} 正向操作/补偿操作的处理函数源码新增的Demo并未在此作展示 app.POST(BusiAPI/SagaBTransIn, dtmutil.WrapHandler(func(c *gin.Context) interface{} {barrier : MustBarrierFromGin(c)return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {return SagaAdjustBalance(tx, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult)})}))app.POST(BusiAPI/SagaBTransInCom, dtmutil.WrapHandler(func(c *gin.Context) interface{} {barrier : MustBarrierFromGin(c)return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {return SagaAdjustBalance(tx, TransInUID, -reqFrom(c).Amount, )})}))app.POST(BusiAPI/SagaBTransOut, dtmutil.WrapHandler(func(c *gin.Context) interface{} {barrier : MustBarrierFromGin(c)return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {return SagaAdjustBalance(tx, TransOutUID, -reqFrom(c).Amount, reqFrom(c).TransOutResult)})}))app.POST(BusiAPI/SagaBTransOutCom, dtmutil.WrapHandler(func(c *gin.Context) interface{} {barrier : MustBarrierFromGin(c)return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {return SagaAdjustBalance(tx, TransOutUID, reqFrom(c).Amount, )})}))开启事务进行分支调用 req : busi.ReqHTTP{Amount: 30}// DtmServer为DTM服务的地址saga : dtmcli.NewSaga(dtmutil.DefaultHTTPServer, shortuuid.New()).// 添加一个TransOut的子事务正向操作为url: qsBusi/TransOut 逆向操作为url: qsBusi/TransOutComAdd(busi.Busi/SagaBTransOut, busi.Busi/SagaBTransOutCom, req).// 添加一个TransIn的子事务正向操作为url: qsBusi/TransOut 逆向操作为url: qsBusi/TransInComAdd(busi.Busi/SagaBTransIn, busi.Busi/SagaBTransInCom, req)// 提交saga事务dtm会完成所有的子事务/回滚所有的子事务logger.Debugf(busi trans submit)err : saga.Submit()2、时序图
与快速上手demo一致 3、处理网络异常
假设提交给dtm的事务中 调用转入操作时出现短暂的故障——dtm会重试未完成的操作此时要求全局事务中的各个子事务时幂等的。
子事务屏障技术提供了BranchBarrier工具类提供Call函数来保证函数内部的业务最多被调用一次。
4、处理回滚
事务失败交互的时序图 TransIn的正向操作发生在提交之前则补偿为空操作TransIn的操作如果发生在提交后则补偿操作会将数据提交一次
五、TCC型事务Demo 业务场景 A需要跨行转账给B 假设场景 转出A和转入B都可能成功和失败需要最终转入转出都成功or失败 还有一个要求假如发生回滚SAGA 模式下会发生A发现自己的余额被扣减了但是收款方B迟迟没有收到余额那么会对A造成很大的困扰。业务上面希望不要出现这种情况
TCC分为3个阶段
Try 阶段尝试执行完成所有业务检查一致性, 预留必须业务资源准隔离性Confirm 阶段如果所有分支的Try都成功了则走到Confirm阶段。Confirm真正执行业务不作任何业务检查只使用 Try 阶段预留的业务资源Cancel 阶段如果所有分支的Try有一个失败了则走到Cancel阶段。Cancel释放 Try 阶段预留的业务资源。 1、核心代码
冻结/解冻资金操作会检查约束balancetrading_balance 0如果约束不成立执行失败trading_balance 表示被冻结的金额
func tccAdjustTrading(db dtmcli.DB, uid int, amount int) error {affected, err : dtmimp.DBExec(BusiConf.Driver, db, update dtm_busi.user_accountset trading_balancetrading_balance?where user_id? and trading_balance ? balance 0, amount, uid, amount)if err nil affected 0 {return fmt.Errorf(update error, maybe balance not enough)}return err
}func tccAdjustBalance(db dtmcli.DB, uid int, amount int) error {affected, err : dtmimp.DBExec(BusiConf.Driver, db, update dtm_busi.user_accountset trading_balancetrading_balance-?,balancebalance? where user_id?, amount, amount, uid)if err nil affected 0 {return fmt.Errorf(update user_account 0 rows)}return err
} Try/Confirm/Cancel处理函数 app.POST(BusiAPI/TccBTransOutTry, dtmutil.WrapHandler(func(c *gin.Context) interface{} {req : reqFrom(c)if req.TransOutResult ! {return string2DtmError(req.TransOutResult)}bb : MustBarrierFromGin(c)if req.Store Redis {return bb.RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), req.Amount, 7*86400)} else if req.Store Mongo {return bb.MongoCall(MongoGet(), func(sc mongo.SessionContext) error {return SagaMongoAdjustBalance(sc, sc.Client(), TransOutUID, -req.Amount, )})}return bb.CallWithDB(pdbGet(), func(tx *sql.Tx) error {return tccAdjustTrading(tx, TransOutUID, -req.Amount)})}))app.POST(BusiAPI/TccBTransOutConfirm, dtmutil.WrapHandler(func(c *gin.Context) interface{} {if reqFrom(c).Store Redis || reqFrom(c).Store Mongo {return nil}return MustBarrierFromGin(c).CallWithDB(pdbGet(), func(tx *sql.Tx) error {return tccAdjustBalance(tx, TransOutUID, -reqFrom(c).Amount)})}))app.POST(BusiAPI/TccBTransOutCancel, dtmutil.WrapHandler(TccBarrierTransOutCancel))app.POST(BusiAPI/TccBTransInTry, dtmutil.WrapHandler(func(c *gin.Context) interface{} {req : reqFrom(c)if req.TransInResult ! {return string2DtmError(req.TransInResult)}return MustBarrierFromGin(c).CallWithDB(pdbGet(), func(tx *sql.Tx) error {return tccAdjustTrading(tx, TransInUID, req.Amount)})}))app.POST(BusiAPI/TccBTransInConfirm, dtmutil.WrapHandler(func(c *gin.Context) interface{} {return MustBarrierFromGin(c).CallWithDB(pdbGet(), func(tx *sql.Tx) error {return tccAdjustBalance(tx, TransInUID, reqFrom(c).Amount)})}))app.POST(BusiAPI/TccBTransInCancel, dtmutil.WrapHandler(func(c *gin.Context) interface{} {return MustBarrierFromGin(c).CallWithDB(pdbGet(), func(tx *sql.Tx) error {return tccAdjustTrading(tx, TransInUID, -reqFrom(c).Amount)})}))开启事务进行分支调用 req : busi.GenReqHTTP(30, false, false)gid : dtmimp.GetFuncName()// TccGlobalTransaction 会开启一个全局事务err : dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {// CallBranch 会将事务分支的Confirm/Cancel注册到全局事务上然后直接调用Try_, err : tcc.CallBranch(req, Busi/TccBTransOutTry, Busi/TccBTransOutConfirm, Busi/TccBTransOutCancel)assert.Nil(t, err)return tcc.CallBranch(req, Busi/TccBTransInTry, Busi/TccBTransInConfirm, Busi/TccBTransInCancel)})2、处理网络异常
同SAGA部分
3、处理回滚
事务失败交互的时序图 跟成功的TCC差别就在于当某个子事务返回失败后后续就回滚全局事务调用各个子事务的Cancel操作保证全局事务全部回滚
TransIn的正向操作发生在提交之前则补偿为空操作TransIn的操作如果发生在提交后则补偿操作会将数据提交一次