佛山关键词网站排名,珠宝 网站欣赏,wordpress换域名不能访问,软件开发设计流程图集成DTM实现跨语言分布式事务V1.0
简介
DTM是一款开源的分布式事务管理器#xff0c;解决跨数据库、跨服务、跨语言栈更新数据的一致性问题。
通俗一点说#xff0c;DTM提供跨服务事务能力#xff0c;一组服务要么全部成功#xff0c;要么全部回滚#xff0c;避免只更新…集成DTM实现跨语言分布式事务V1.0
简介
DTM是一款开源的分布式事务管理器解决跨数据库、跨服务、跨语言栈更新数据的一致性问题。
通俗一点说DTM提供跨服务事务能力一组服务要么全部成功要么全部回滚避免只更新了一部分数据产生的一致性问题。
https://dtm.pub/
安装运行DTM
以下安装可以使用Dockerfile的方式与dmc服务一起部署另外官网介绍的docker安装模式我没成功
安装go语言环境
#下载安装go语言包请使用1.20以上版本其他版本安装失败
wget -c https://dl.google.com/go/go1.20.1.linux-amd64.tar.gz -O - | sudo tar -xz -C /usr/localcd /usr/local/
mkdir gopath vi /etc/profile #在/etc/profile添加go环境配置export GOROOT/usr/local/go
export GOPATH/usr/local/gopath
export PATH$PATH:$GOROOT/bin
export GO111MODULEon # 开启 Go moudles 特性
export GOPROXYhttps://goproxy.cn,direct # 安装 Go 模块时国内代理服务器设置#查看版本
go version安装dtm服务
git clone https://github.com/dtm-labs/dtm cd dtm
go build#运行./dtm看到如下说明成功
[rootparatera128 dtm]# ./dtm
invalid log level: , switching to default: INFO
{level:info,ts:2023-07-25T18:54:17.816-0700,caller:entry/main.go:46,msg:dtm version is: v0.0.0-dev}
{level:info,ts:2023-07-25T18:54:17.818-0700,caller:config/config.go:121,msg:config file: loaded config is: \n{\n \Store\: {\n \Driver\: \boltdb\,\n \Host\: \\,\n \Port\: 0,\n \User\: \\,\n \Password\: \\,\n \Db\: \dtm\,\n \Schema\: \public\,\n \MaxOpenConns\: 500,\n \MaxIdleConns\: 500,\n \ConnMaxLifeTime\: 5,\n \DataExpire\: 604800,\n \FinishedDataExpire\: 86400,\n \RedisPrefix\: \{a}\\n },\n \TransCronInterval\: 3,\n \TimeoutToFail\: 35,\n \RetryInterval\: 10,\n \RequestTimeout\: 3,\n \HTTPPort\: 36789,\n \GrpcPort\: 36790,\n \JSONRPCPort\: 36791,\n \MicroService\: {\n \Driver\: \default\,\n \Target\: \\,\n \EndPoint\: \\\n },\n \HTTPMicroService\: {\n \Driver\: \default\,\n \RegistryType\: \\,\n \RegistryAddress\: \\,\n \RegistryOptions\: \{}\,\n \Target\: \\,\n \EndPoint\: \\\n },\n \UpdateBranchSync\: 0,\n \UpdateBranchAsyncGoroutineNum\: 1,\n \LogLevel\: \info\,\n \Log\: {\n \Outputs\: \stderr\,\n \RotationEnable\: 0,\n \RotationConfigJSON\: \{}\\n },\n \TimeZoneOffset\: \\,\n \ConfigUpdateInterval\: 3,\n \AlertRetryLimit\: 3,\n \AlertWebHook\: \\,\n \AdminBasePath\: \\\n}}
{level:info,ts:2023-07-25T18:54:17.820-0700,caller:maxprocs/maxprocs.go:47,msg:maxprocs: Leaving GOMAXPROCS1: CPU quota undefined}
{level:info,ts:2023-07-25T18:54:17.827-0700,caller:dtmsvr/svr.go:32,msg:start dtmsvr}
{level:info,ts:2023-07-25T18:54:17.828-0700,caller:dtmsvr/svr.go:51,msg:dtmsvr http listen at: 36789}
{level:info,ts:2023-07-25T18:54:17.830-0700,caller:dtmsvr/svr.go:65,msg:grpc listening at [::]:36790}
{level:info,ts:2023-07-25T18:54:17.931-0700,caller:dtmsvr/svr.go:80,msg:RegisterService: default}
{level:info,ts:2023-07-25T18:54:17.932-0700,caller:dtm/main.go:96,msg:admin is proxied to admin.dtm.pub}
{level:info,ts:2023-07-25T18:54:17.933-0700,caller:dtm/main.go:98,msg:admin is running at: http://localhost:36789}访问http://localhost:36789可以看到所有的事务状态和过程数据
数据存储
DTM部署后会默认将数据进行文件的形式存储文件名dtm.bolt当然你也可以通过修改conf.yml文件以关系型数据库redis等方式存储
如下提供MySQL的数据库脚本可以根据需要定制化监控处理功能并且DTM提供了相应的API供服务方进行RM表的数据保存
RM表资源管理器
CREATE TABLE dtm.barrier (id bigint(22) NOT NULL AUTO_INCREMENT,trans_type varchar(45) DEFAULT COMMENT 事务模式,gid varchar(128) DEFAULT COMMENT 全局事务id,branch_id varchar(128) DEFAULT COMMENT 分支事务id,op varchar(45) DEFAULT COMMENT 事务操作, barrier_id varchar(45) DEFAULT COMMENT 分支事务层级如果分支事务包含事务会递增,reason varchar(45) DEFAULT COMMENT 插入此记录的分支类型,如果成功与op相同如果失败与op相反,create_time datetime DEFAULT CURRENT_TIMESTAMP,update_time datetime DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (id),UNIQUE KEY gid (gid,branch_id,op,barrier_id),KEY create_time (create_time),KEY update_time (update_time)
) ENGINEInnoDB AUTO_INCREMENT44 DEFAULT CHARSETutf8;TM表事务管理器
CREATE TABLE if not EXISTS dtm.trans_global (id bigint(22) NOT NULL AUTO_INCREMENT,gid varchar(128) NOT NULL COMMENT global transaction id,trans_type varchar(45) not null COMMENT transaction type: saga | xa | tcc | msg,status varchar(12) NOT NULL COMMENT transaction status: prepared | submitted | aborting | succeed | failed,query_prepared varchar(1024) NOT NULL COMMENT url to check for msg|workflow,protocol varchar(45) not null comment protocol: http | grpc | json-rpc,create_time datetime DEFAULT NULL,update_time datetime DEFAULT NULL,finish_time datetime DEFAULT NULL,rollback_time datetime DEFAULT NULL,options varchar(1024) DEFAULT COMMENT options for transaction like: TimeoutToFail, RequestTimeout,custom_data varchar(1024) DEFAULT COMMENT custom data for transaction,next_cron_interval int(11) default null comment next cron interval. for use of cron job,next_cron_time datetime default null comment next time to process this trans. for use of cron job,owner varchar(128) not null default comment who is locking this trans,ext_data TEXT comment extra data for this trans. currently used in workflow pattern,result varchar(1024) DEFAULT COMMENT result for transaction,rollback_reason varchar(1024) DEFAULT COMMENT rollback reason for transaction,PRIMARY KEY (id),UNIQUE KEY gid (gid),key owner(owner),key status_next_cron_time (status, next_cron_time) comment cron job will use this index to query trans
) ENGINE InnoDB DEFAULT CHARSET utf8mb4;CREATE TABLE IF NOT EXISTS dtm.trans_branch_op (id bigint(22) NOT NULL AUTO_INCREMENT,gid varchar(128) NOT NULL COMMENT global transaction id,url varchar(1024) NOT NULL COMMENT the url of this op,data TEXT COMMENT request body, depreceated,bin_data BLOB COMMENT request body,branch_id VARCHAR(128) NOT NULL COMMENT transaction branch ID,op varchar(45) NOT NULL COMMENT transaction operation type like: action | compensate | try | confirm | cancel,status varchar(45) NOT NULL COMMENT transaction op status: prepared | succeed | failed,finish_time datetime DEFAULT NULL,rollback_time datetime DEFAULT NULL,create_time datetime DEFAULT NULL,update_time datetime DEFAULT NULL,PRIMARY KEY (id),UNIQUE KEY gid_uniq (gid, branch_id, op)
) ENGINE InnoDB DEFAULT CHARSET utf8mb4;CREATE TABLE IF NOT EXISTS dtm.kv (id bigint(22) NOT NULL AUTO_INCREMENT,cat varchar(45) NOT NULL COMMENT the category of this data,k varchar(128) NOT NULL,v TEXT,version bigint(22) default 1 COMMENT version of the value,create_time datetime default NULL,update_time datetime DEFAULT NULL,PRIMARY KEY (id),UNIQUE key uniq_k(cat, k)
) ENGINE InnoDB DEFAULT CHARSET utf8mb4;DTM入门示例
maven依赖
dependencygroupIdio.github.dtm-labs/groupIdartifactIddtmcli-java/artifactIdversion2.1.4/version
/dependencyapplication.yml
server:port: 8081#DTM服务地址
dtm:ipport: 192.168.137.128:36789spring:datasource:url: jdbc:mysql://182.92.67.160:3316/dtm?useUnicodetruecharacterEncodingutf-8useSSLtrueserverTimezoneUTCusername: rootpassword: parateradriver-class-name: com.mysql.cj.jdbc.DriverSaga模式
将长事务拆分为多个分支事务由Saga事务协调器协调如果每个分支事务都成功提交完成那么全局事务就正常完成如果某个步骤失败则根据相反顺序一次调用补偿操作。
时序图
成功完成 失败回滚 模拟正向操作及反向补偿
RequestMapping(TransOut)
public Object TransOut() {try {logger.info(TransOut);return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}RequestMapping(TransOutCompensate)
public Object TransOutCompensate() {try {logger.info(TransOutCompensate);return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}RequestMapping(TransIn)
public Object TransIn() {try {logger.info(TransIn);//int i 1/0 ;模拟异常时回滚return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}RequestMapping(TransInCompensate)
public Object TransInCompensate() {try {logger.info(TransInCompensate);return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}执行全局事务
//这个地方如果DTM服务跟项目不是同一台机器请填写IP地址并保障可以ping通
private static final String svc http://192.168.110.174:8081/api;Value(${dtm.ipport})
private String ipPort;RequestMapping(testSaga)
public String testSage() {DtmClient dtmClient new DtmClient(ipPort);try {// 全局事务id保证唯一即可String customGid UUID.randomUUID().toString();Saga saga dtmClient.newSaga(customGid)//按业务顺序将正向操作和反向补偿操作注入到Saga流中通过事务协调器执行第三个参数为分支接口传参正向操作和反向补偿传参都是一样的.add(svc /TransOut, svc /TransOutCompensate, null).add(svc /TransIn, svc /TransInCompensate, null).enableWaitResult();saga.submit();} catch (Exception e) {log.error(saga submit error, e);return fail;}return success;
}成功完成 失败回滚 TCC模式
TCC是Try、Confirm、Cancel三个词语的缩写
Try 阶段尝试执行完成所有业务检查一致性, 预留必须业务资源准隔离性Confirm 阶段如果所有分支的Try都成功了则走到Confirm阶段。Confirm真正执行业务不作任何业务检查只使用 Try 阶段预留的业务资源Cancel 阶段如果所有分支的Try有一个失败了则走到Cancel阶段。Cancel释放 Try 阶段预留的业务资源。
时序图
成功完成 失败回滚 模拟三阶段事务
RequestMapping(TransOutTry)
public Object TransOutTry() {try {logger.info(TransOutTry);return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}RequestMapping(TransOutConfirm)
public Object TransOutConfirm() {try {logger.info(TransOutConfirm);return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}RequestMapping(TransOutCancel)
public Object TransOutCancel() {try {logger.info(TransOutCancel);return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}RequestMapping(TransInTry)
public Object TransInTry() {try {logger.info(TransInTry);return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}RequestMapping(TransInConfirm)
public Object TransInConfirm() {try {logger.info(TransInConfirm);return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}RequestMapping(TransInCancel)
public Object TransInCancel() {try {logger.info(TransInCancel);return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}注册执行事务分支
private static final String svc http://192.168.110.174:8081/api;Value(${dtm.ipport})
private String ipport;RequestMapping(testTcc)
public String testTcc() {//创建dtm clinetDtmClient dtmClient new DtmClient(ipport);//创建tcc事务String customGid UUID.randomUUID().toString();try {dtmClient.tccGlobalTransaction(customGid, tcc - {//第一个参数为接口传参三阶段对应的都是相同的参数tcc.callBranch(null, svc /TransOutTry, svc /TransOutConfirm, svc /TransOutCancel);tcc.callBranch(null, svc /TransInTry, svc /TransInConfirm, svc /TransInCancel);});} catch (Exception e) {log.error(tccGlobalTransaction error, e);return fail;}return success;
}成功完成 失败回滚 操作异常思考
假如Compensate/Confirm/Cancel操作遇见失败会怎么样按照业务的思路Compensate/Confirm/Cancel操作是要求最终成功的遇见失败的情况都是由于临时故障或者程序bug。dtm在Compensate/Confirm/Cancel操作遇见失败时会不断进行重试时间间隔为10 * 2的幂次方直到成功当然如果是因为业务条件不满足等非技术环境问题不应该进入重试这一点应该在程序编写时充分考虑。
为了避免程序bug导致补偿操作一直无法成功可以通过DTM管理控制台进行监控由运维人员手动后台处理强制停止不过这个目前做得比较粗糙凑合着用 另外如果对业务没有特别的要求可以暴力设置超时回滚
saga.setTimeoutToFail(1800);Saga与TCC模式对比
其实还有两阶段、XA模式等因为并发锁的原因性能比较低特别是对于死锁的处理需要大量的人工介入
SagaTCC难度简单只需要正反两个分支事务复杂需要Try、Confirm、Cancel三个分支事务一致性低缺少资源预留并发度高时可能出现脏读高一开始就会对所有资源进行预留try并发执行能力支持并发和按序执行不支持并发事务嵌套不支持支持
先易后难我们后续的功能开发先基于Saga模式进行
DMC平台集成分布式事务V1.0
数据库设计
DROP TABLE IF EXISTS trans_setting;
CREATE TABLE IF NOT EXISTS trans_setting (id bigint(22) NOT NULL AUTO_INCREMENT,call_url varchar(45) NOT NULL COMMENT 全局事务执行标志原则上只需要保证唯一性即可建议使用项目名模块名方法名如console.user.getUser,create_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,updated_at datetime ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (id),UNIQUE KEY call_url (call_url)
) ENGINE InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci COMMENT 全局事务执行表 ROW_FORMAT Compact;DROP TABLE IF EXISTS trans_setting_detail;
CREATE TABLE IF NOT EXISTS trans_setting_detail (id bigint(22) NOT NULL AUTO_INCREMENT,fid bigint(22) NOT NULL COMMENT 全局事务URL表主键,branch_url varchar(100) NOT NULL COMMENT 分支url完整地址ip、port、restful接口,branch_rb_url varchar(100) NOT NULL COMMENT 分支回滚url完整地址ip、port、restful接口,create_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,updated_at datetime ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (id),UNIQUE KEY fid_branch_url (fid,branch_url)
) ENGINE InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci COMMENT 分支事务URL表 ROW_FORMAT Compact;增删改查
略…
DTM服务地址配置
dtm:url: 192.168.137.128:36789全局事务执行接口
通过restful接口的方式兼容跨语言、服务、数据库的全局事务处理
/*** 分布式事务统一调度接口** param callUrl 配置的全局事务执行标志* param postData 分支事务接口参数* return*/
PostMapping(/s-api/globalTransaction)
public String execGlobalTransaction(String callUrl, Object postData) {return transactionService.execGlobalTransaction(callUrl, postData);
}Value(${dtm.url})
private String dtmUrl;public String execGlobalTransaction(String callUrl, Object postData) {QueryWrapper wrapper new QueryWrapper();wrapper.eq(call_url, callUrl);TransSetting transSetting settingDao.selectOne(wrapper);wrapper.clear();wrapper.eq(fid, transSetting.getId());ListTransSettingDetail list settingDetailDao.selectList(wrapper);DtmClient dtmClient new DtmClient(dtmUrl);try {//使用callUrl 时间戳 四位随机数作为gid比较好定位问题String time DateUtil.date2String(Calendar.getInstance().getTime(), DateUtil.DATE_TIME);int number new Random().nextInt(9000) 1000;String customGid callUrl : time : number;Saga saga dtmClient.newSaga(customGid);//循环遍历添加分支urllist.forEach(detail - {saga.add(detail.getBranchUrl(), detail.getBranchRbUrl(), postData);});saga.enableWaitResult();saga.submit();} catch (Exception e) {log.error(saga submit error, e);return fail;}return success;
}调度测试 从DTM控制台可以看到调度成功并且没有触发回滚 我们人为在转入分支制造异常再次测试 调度失败发生回滚