当前位置: 首页 > news >正文

有哪些做农产品的网站有哪些陕西省交通建设公司网站

有哪些做农产品的网站有哪些,陕西省交通建设公司网站,社交电商怎么做赚钱,火车头采集器wordpress下载分布式事务与Seata落地 一、事务基础 1.1 本地事务 事务指的就是一个操作单元#xff0c;在这个操作单元中的所有操作最终要保持一致的行为#xff0c;要么所有操作都成功#xff0c;要么所有的操作都被撤销。 1.2 本地事务特性 本地事务四大特性: ACID A#xff1a;原…分布式事务与Seata落地 一、事务基础 1.1 本地事务 事务指的就是一个操作单元在这个操作单元中的所有操作最终要保持一致的行为要么所有操作都成功要么所有的操作都被撤销。 1.2 本地事务特性 本地事务四大特性: ACID A原子性(Atomicity)一个事务中的所有操作要么全部完成要么全部不完成 C一致性(Consistency)在一个事务执行之前和执行之后数据库都必须处于一致性状态 I隔离性(Isolation)在并发环境中当不同的事务同时操作相同的数据时事务之间互不影响 D持久性(Durability)指的是只要事务成功结束它对数据库所做的更新就必须永久的保存下来 数据库事务在实现时会将一次事务涉及的所有操作全部纳入到一个不可分割的执行单元该执行单元中的所有操作要么都成功要么都失败只要其中任一操作执行失败都将导致整个事务的回滚。 1.3 事务实现原理 以MySQL 的InnoDB InnoDB 是 MySQL 的一个存储引擎为例介绍一下单一数据库的事务实现原理。 InnoDB 是通过 日志和锁 来保证的事务的 ACID特性具体如下 1通过数据库锁的机制保障事务的隔离性 2通过 Redo Log重做日志来保障事务的持久性 3通过 Undo Log 撤销日志来保障事务的原子性 4通过 Undo Log 撤销日志来保障事务的一致性 Undo Log 如何保障事务的原子性呢 具体的方式为在操作任何数据之前首先将数据备份到一个地方这个存储数据备份的地方称为 Undo Log然后进行数据的修改。如果出现了错误或者用户执行了 Rollback 语句系统可以利用 Undo Log 中的备份将数据恢复到事务开始之前的状态。 Redo Log如何保障事务的持久性呢 具体的方式为Redo Log 记录的是新数据的备份和 Undo Log 相反。在事务提交前只要将 Redo Log 持久化即可不需要将数据持久化。当系统崩溃时虽然数据没有持久化但是 Redo Log 已经持久化。系统可以根据 Redo Log 的内容将所有数据恢复到崩溃之前的状态。 1.4 分布式事务 分布式事务在分布式系统中一次操作需要由多个(微)服务协同完成这种由不同的服务之间通过网络协同完成的事务称为分布式事务 通俗讲就是一次大的操作由不同的小操作组成这些小的操作分布在不同的服务器上且属于不同的应用分布式事务需要保证这些小操作要么全部成功要么全部失败。 大白话: 分布式环境下保证各个事务的ACID 1.5 分布式事务场景 1.5.1 跨库事务 跨库事务指的是一个相对复杂的逻辑功能需要操作多个库数据不同的库中存储不同的业务数据。 此时操作需要考虑DB1事务也需要考虑DB2的事务妥妥的分布式事务。 1.5.2 分库分表 当一个库/或者一张表数据量比较大或者预期未来的数据量会有比较大增长时再设计表是考虑水平拆分即分库分表。 DB2分库分表之后执行DML操作时需要考虑分区1分区2操作一起成功一起失败。此时service面临着分布式事务的问题。 1.5.3 微服务化 一个逻辑多个微服务参与涉及多个数据库数据改动又是一个分布式事务场景。 1.6 分布式事务特性 1.6.1 CAP概念 CAP定理是分布式系统中的重要理论在一个分布式系统中最多只能同时满足一致性Consistency、可用性Availability和分区容错性Partition tolerance这三项中的两项不能同时满足这三项。 CConsistency一致性 一致性指all nodes see the same data at the same time即更新操作成功后所有节点在同一时间的数据完全一致。 AAvailability可用性 可用性指reads and writes always succeed即服务一直可用且能够正常响应不保证返回的是最新写入的数据。 PPartition tolerance分区容错性 分区容错性指the system continues to operate despite arbitrary message loss or failure of part of the system即分布式系统在遇到某节点或网络分区故障的时候仍然能够对外提供满足一致性和可用性的服务。 1.6.2 CAP选择 无数实践证明CAP只存在理想的状态真实的的分布式系统必须在CAP 3者中选其2 CA 放弃P放弃分区容错性的话则放弃了分布式放弃了系统的可扩展性 CP 放弃A放弃可用性的话则在遇到网络分区或其他故障时受影响的服务需要等待一定的时间再此期间无法对外提供政策的服务即不可用 AP 放弃C放弃一致性的话则系统无法保证数据保持实时的一致性在数据达到最终一致性时有个时间窗口在时间窗口内数据是不一致的。 注意对于分布式系统来说P是不能放弃的因此架构师通常是在可用性和一致性之间权衡。 16.3 CAP无法共存证明 理想操作 用户第一次访问通过Nginx后走网段1执行数据添加数据同步到网段2的数据库中 用户第二次访问通过Nginx后走网段2执行数据查询查询到第一次添加的啊data数据 非理想状态 用户第一次访问通过Nginx后走网段1执行数据添加数据无法同步到网段2的数据库 用户第二次访问通过Nginx后走网段2执行数据查询 分布式系统存在意义最低要求必须保证分区容错性成立那么就意味着上面操作存在2种选择。 选择可用性(A)牺牲一致性©返回空数据 选择一致性©牺牲可用性(A)阻塞等待直到网段1网段2连通数据同步完成之后再返回data数据。 1.6.4 BASE BASE是Basically Available(基本可用、**Soft state(软状态和Eventually consistent(最终一致性**三个短语的简写。 BASE是对CAP中一致性和可用性权衡的结果其来源于对大规模互联网系统分布式实践的总结是基于CAP定理逐步演化而来的其核心思想是即使无法做到强一致性但每个应用都可以根据自身的业务特点采用适当的方法来使系统达到最终一致性。 Basically Available(基本可用 基本可用指分布式系统在出现故障时系统允许损失部分可用性即保证核心功能或者当前最重要功能可用。对于用户来说他们当前最关注的功能或者最常用的功能的可用性将会获得保证但是其他功能会被削弱 Soft state(软状态 软状态指的是允许系统中的数据存在中间状态并认为该状态不影响系统的整体可用性即允许系统在多个不同节点的数据副本存在数据延时。 Eventually consistent(最终一致性 上面说软状态然后不可能一直是软状态必须有个时间期限。在期限过后应当保证所有副本保持数据一致性从而达到数据的最终一致性。这个时间期限取决于网络延时、系统负载、数据复制方案设计等等因素。 总结 总的来说BASE 理论面向的是大型高可用可扩展的分布式系统和传统事务的 ACID 是相反的它完全不同于 ACID 的强一致性模型而是通过牺牲强一致性来获得可用性并允许数据在一段时间是不一致的。 二、分布式事务问题 2.1 需求 需求用户下单扣款扣库存。 2.2 项目演示 根据上面分析项目设计出3个微服务 业务服务business-service 订单服务order-service 账户服务account-service 库存服务stock-service 代码如下 2.2.1 数据库准备 创建3个数据库与3张表 seata-account CREATE TABLE t_account (id int(11) NOT NULL AUTO_INCREMENT,user_id varchar(255) DEFAULT NULL,money int(11) DEFAULT 0,PRIMARY KEY (id) ) ENGINEInnoDB DEFAULT CHARSETutf8;INSERT INTO t_account VALUES (1, U100000, 900);seata-order CREATE TABLE t_order (id int(11) NOT NULL AUTO_INCREMENT,user_id varchar(255) DEFAULT NULL,commodity_code varchar(255) DEFAULT NULL,count int(11) DEFAULT 0,money int(11) DEFAULT 0,PRIMARY KEY (id) ) ENGINEInnoDB DEFAULT CHARSETutf8;seata-stock CREATE TABLE t_stock (id int(11) NOT NULL AUTO_INCREMENT,commodity_code varchar(255) DEFAULT NULL,count int(11) DEFAULT 0,PRIMARY KEY (id),UNIQUE KEY (commodity_code) ) ENGINEInnoDB DEFAULT CHARSETutf8;INSERT INTO t_stock VALUES (1, C100000, 10);2.2.2 库存服务 stock-service 依赖 dependencies!-- bootstrap 启动器 --dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-bootstrap/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactId/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.56/version/dependencydependencygroupIdcom.baomidou/groupIdartifactIdmybatis-plus-boot-starter/artifactIdversion3.4.2/version/dependency!--nacos客户端--dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-nacos-discovery/artifactId/dependency!--fegin组件--dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-openfeign/artifactId/dependency!--sentinel组件--dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-sentinel/artifactId/dependencydependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-nacos-config/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-loadbalancer/artifactId/dependency /dependencies配置文件 # Tomcat server:port: 8083 # Spring spring:application:# 应用名称name: stock-serviceprofiles:# 环境配置active: devcloud:nacos:discovery:# 服务注册地址server-addr: 127.0.0.1:8848datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql:///seata-stock?serverTimezoneUTCuseUnicodetruecharacterEncodingutf-8useSSLtrueusername: rootpassword: admin mybatis-plus:configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpldomain package cn.xxx.tx.stock.domain;import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data;Data TableName(t_stock) public class Stock {TableId(type IdType.AUTO)private Integer id;private String commodityCode;private Integer count; } mapper package cn.xxx.tx.stock.mapper;import cn.xxx.tx.stock.domain.Stock; import com.baomidou.mybatisplus.core.mapper.BaseMapper;public interface StockMapper extends BaseMapperStock { } service package cn.xxx.tx.stock.service;import cn.xxx.tx.stock.domain.Stock; import com.baomidou.mybatisplus.extension.service.IService;public interface IStockService extends IServiceStock {/*** 扣库存* param commodityCode* param count*/void deduct(String commodityCode, int count); } service.impl package cn.xxx.tx.stock.service.impl;import cn.xxx.tx.stock.domain.Stock; import cn.xxx.tx.stock.mapper.StockMapper; import cn.xxx.tx.stock.service.IStockService; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import io.seata.core.context.RootContext; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional;Service public class StockServiceImpl extends ServiceImplStockMapper, Stock implements IStockService {OverrideTransactionalpublic void deduct(String commodityCode, int count) {Stock one lambdaQuery().eq(Stock::getCommodityCode, commodityCode).one();if(one ! null one.getCount() count){throw new RuntimeException(Not Enough Count ...);}lambdaUpdate().setSql(count count- count).eq(Stock::getCommodityCode, commodityCode).update();} } controller package cn.xxx.tx.stock.controller;import cn.xxx.tx.stock.service.IStockService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*;RestController RequestMapping(stocks) public class StockController {Autowiredprivate IStockService StockService;Autowiredprivate IStockService stockService;GetMapping(value /deduct)public String deduct(String commodityCode, int count) {try {stockService.deduct(commodityCode, count);} catch (Exception exx) {exx.printStackTrace();return FAIL;}return SUCCESS;} } 启动类 package cn.xxx.tx;import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.cloud.openfeign.EnableFeignClients;SpringBootApplication EnableDiscoveryClient EnableFeignClients MapperScan(cn.xxx.tx.stock.mapper) public class StockApplication {public static void main(String[] args) {SpringApplication.run(StockApplication.class, args);} } 2.2.3 账户服务 account-service 依赖 dependencies!-- bootstrap 启动器 --dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-bootstrap/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactId/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.56/version/dependencydependencygroupIdcom.baomidou/groupIdartifactIdmybatis-plus-boot-starter/artifactIdversion3.4.2/version/dependency!--nacos客户端--dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-nacos-discovery/artifactId/dependency!--fegin组件--dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-openfeign/artifactId/dependency!--sentinel组件--dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-sentinel/artifactId/dependencydependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-nacos-config/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency /dependencies配置文件 # Tomcat server:port: 8081 # Spring spring:application:# 应用名称name: account-serviceprofiles:# 环境配置active: devcloud:nacos:discovery:# 服务注册地址server-addr: 127.0.0.1:8848datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql:///seata-account?serverTimezoneUTCuseUnicodetruecharacterEncodingutf-8useSSLtrueusername: rootpassword: admin mybatis-plus:configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpl domain package cn.xxx.tx.account.domain;import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data;Data TableName(t_account) public class Account {TableId(type IdType.AUTO)private Integer id;private String userId;private int money; } mapper package cn.xxx.tx.account.mapper;import cn.xxx.tx.account.domain.Account; import com.baomidou.mybatisplus.core.mapper.BaseMapper;public interface AccountMapper extends BaseMapperAccount { } service package cn.xxx.tx.account.service;import cn.xxx.tx.account.domain.Account; import com.baomidou.mybatisplus.extension.service.IService;public interface IAccountService extends IServiceAccount {/*** 账户扣款* param userId* param money* return*/void reduce(String userId, int money); } service.impl package cn.xxx.tx.account.service.impl;import cn.xxx.tx.account.domain.Account; import cn.xxx.tx.account.mapper.AccountMapper; import cn.xxx.tx.account.service.IAccountService; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import io.seata.core.context.RootContext; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional;Service public class AccountServiceImpl extends ServiceImplAccountMapper, Account implements IAccountService {OverrideTransactionalpublic void reduce(String userId, int money) {Account one lambdaQuery().eq(Account::getUserId, userId).one();if(one ! null one.getMoney() money){throw new RuntimeException(Not Enough Money ...);}lambdaUpdate().setSql(money money - money).eq(Account::getUserId, userId).update();} } controller package cn.xxx.tx.account.controller;import cn.xxx.tx.account.domain.Account; import cn.xxx.tx.account.service.IAccountService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*;RestController RequestMapping(accounts) public class AccountController {Autowiredprivate IAccountService accountService;GetMapping(value /reduce)public String reduce(String userId, int money) {try {accountService.reduce(userId, money);} catch (Exception exx) {exx.printStackTrace();return FAIL;}return SUCCESS;}} 启动类 package cn.xxx.tx;import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.cloud.openfeign.EnableFeignClients;SpringBootApplication MapperScan(cn.xxx.tx.account.mapper) EnableDiscoveryClient EnableFeignClients public class AccountApplication {public static void main(String[] args) {SpringApplication.run(AccountApplication.class, args);} } 2.2.4 订单服务 order-service 依赖 dependencies!-- bootstrap 启动器 --dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-bootstrap/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactId/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.56/version/dependencydependencygroupIdcom.baomidou/groupIdartifactIdmybatis-plus-boot-starter/artifactIdversion3.4.2/version/dependency!--nacos客户端--dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-nacos-discovery/artifactId/dependency!--fegin组件--dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-openfeign/artifactId/dependency!--sentinel组件--dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-sentinel/artifactId/dependencydependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-nacos-config/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-loadbalancer/artifactId/dependency /dependencies配置文件 # Tomcat server:port: 8082 # Spring spring:application:# 应用名称name: order-serviceprofiles:# 环境配置active: devcloud:nacos:discovery:# 服务注册地址server-addr: 127.0.0.1:8848datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql:///seata-order?serverTimezoneUTCuseUnicodetruecharacterEncodingutf-8useSSLtrueusername: rootpassword: adminmybatis-plus:configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpldomain package cn.xxx.tx.order.domain;import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data;Data TableName(t_order) public class Order {TableId(type IdType.AUTO)private Integer id;private String userId;private String commodityCode;private Integer count;private Integer money; } mapper package cn.xxx.tx.order.mapper;import cn.xxx.tx.order.domain.Order; import com.baomidou.mybatisplus.core.mapper.BaseMapper;public interface OrderMapper extends BaseMapperOrder { } service package cn.xxx.tx.order.service;import cn.xxx.tx.order.domain.Order; import com.baomidou.mybatisplus.extension.service.IService;public interface IOrderService extends IServiceOrder {/*** 创建订单*/void create(String userId, String commodityCode, int orderCount); } service.impl package cn.xxx.tx.order.service.impl;import cn.xxx.tx.order.domain.Order; import cn.xxx.tx.order.feign.AccountFeignClient; import cn.xxx.tx.order.mapper.OrderMapper; import cn.xxx.tx.order.service.IOrderService; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import io.seata.core.context.RootContext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional;Service Transactional public class OrderServiceImpl extends ServiceImplOrderMapper, Order implements IOrderService {Autowiredprivate AccountFeignClient accountFeignClient;OverrideTransactionalpublic void create(String userId, String commodityCode, int count) {// 定单总价 订购数量(count) * 商品单价(100)int orderMoney count * 100;// 生成订单Order order new Order();order.setCount(count);order.setCommodityCode(commodityCode);order.setUserId(userId);order.setMoney(orderMoney);super.save(order);// 调用账户余额扣减String result accountFeignClient.reduce(userId, orderMoney);if (!SUCCESS.equals(result)) {throw new RuntimeException(Failed to call Account Service. );}} } controller package cn.xxx.tx.order.controller;import cn.xxx.tx.order.domain.Order; import cn.xxx.tx.order.service.IOrderService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*;RestController RequestMapping(orders) public class OrderController {Autowiredprivate IOrderService orderService;GetMapping(value /create)public String create(String userId, String commodityCode, int orderCount) {try {orderService.create(userId, commodityCode, orderCount);} catch (Exception exx) {exx.printStackTrace();return FAIL;}return SUCCESS;} } Feign接口 package cn.xxx.tx.order.feign;import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam;FeignClient(name account-service) public interface AccountFeignClient {GetMapping(/accounts/reduce)String reduce(RequestParam(userId) String userId, RequestParam(money) int money); } 启动类 package cn.xxx.tx;import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.cloud.openfeign.EnableFeignClients;SpringBootApplication MapperScan(cn.xxx.tx.order.mapper) EnableDiscoveryClient EnableFeignClients public class OrderApplication {public static void main(String[] args) {SpringApplication.run(OrderApplication.class, args);} } 2.2.6 业务服务 business-service 依赖 dependencies!-- bootstrap 启动器 --dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-bootstrap/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.56/version/dependency!--nacos客户端--dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-nacos-discovery/artifactId/dependency!--fegin组件--dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-openfeign/artifactId/dependency!--sentinel组件--dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-sentinel/artifactId/dependencydependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-nacos-config/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-loadbalancer/artifactId/dependency /dependencies配置文件 # Tomcat server:port: 8088 # Spring spring:application:# 应用名称name: business-serviceprofiles:# 环境配置active: devcloud:nacos:discovery:# 服务注册地址server-addr: 127.0.0.1:8848 测试数据 package cn.xxx.tx.business;public class TestDatas {public static final String USER_ID U100000;public static final String COMMODITY_CODE C100000; } service package cn.xxx.tx.business.service;public interface IBusinessService{void purchase(String userId, String commodityCode, int orderCount, boolean rollback); } service.impl package cn.xxx.tx.business.service.impl;import cn.xxx.tx.business.feign.OrderFeignClient; import cn.xxx.tx.business.feign.StockFeignClient; import cn.xxx.tx.business.service.IBusinessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;Service public class BusinessServiceImpl implements IBusinessService {private static final Logger LOGGER LoggerFactory.getLogger(BusinessServiceImpl.class);Autowiredprivate StockFeignClient stockFeignClient;Autowiredprivate OrderFeignClient orderFeignClient;Overridepublic void purchase(String userId, String commodityCode, int orderCount, boolean rollback) {String result stockFeignClient.deduct(commodityCode, orderCount);if (!SUCCESS.equals(result)) {throw new RuntimeException(库存服务调用失败,事务回滚!);}result orderFeignClient.create(userId, commodityCode, orderCount);if (!SUCCESS.equals(result)) {throw new RuntimeException(订单服务调用失败,事务回滚!);}if (rollback) {throw new RuntimeException(Force rollback ... );}} } feign接口 package cn.xxx.tx.business.feign;import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam;FeignClient(name order-service) public interface OrderFeignClient {GetMapping(/orders/create)String create(RequestParam(userId) String userId, RequestParam(commodityCode) String commodityCode,RequestParam(orderCount) int orderCount);} package cn.xxx.tx.business.feign;import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam;FeignClient(name stock-service) public interface StockFeignClient {GetMapping(/stocks/deduct)String deduct(RequestParam(commodityCode) String commodityCode, RequestParam(count) int count);} controller package cn.xxx.tx.business.controller;import cn.xxx.tx.business.TestDatas; import cn.xxx.tx.business.service.IBusinessService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;RestController RequestMapping(businesses) public class BusinessController {Autowiredprivate IBusinessService businessService;GetMapping(value /purchase)public String purchase(Boolean rollback, Integer count) {int orderCount 10;if (count ! null) {orderCount count;}try {businessService.purchase(TestDatas.USER_ID, TestDatas.COMMODITY_CODE, orderCount,rollback null ? false : rollback.booleanValue());} catch (Exception exx) {return Purchase Failed: exx.getMessage();}return SUCCESS;} } 启动类 package cn.xxx.tx;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.cloud.openfeign.EnableFeignClients;SpringBootApplication EnableDiscoveryClient EnableFeignClients public class BusinessApplication {public static void main(String[] args) {SpringApplication.run(BusinessApplication.class, args);} } 2.2.5 测试 启动nacos, 启动4个服务, 访问: http://localhost:8088/businesses/purchase?rollbackfalsecount10 2.3 提出问题 关闭order-service 再发起请求会怎样 此时该怎么办? 此时就需要用到分布式事务啦~ 三、分布式事务常用方案 方案汇总 先上一幅图: 尼恩大神画的 原地址https://blog.csdn.net/crazymakercircle/article/details/109459593 很明显可以看出分布式事务后续演变成2条路径 CP(一致性 分区) 放弃可用性,保证数据强一致性. **经典方案: 12PC 23PC ** AP(可用性 分区) 暂时放弃一致性,保证可用,后续通过某种手段(比如: MQ/程序补偿)打到最终一致性性. 经典方案: 1本地消息表 2MQ消息事务 3TCC 4SAGA 方案12PC XA协议 要讲清楚2PC模式需要科普一个协议XA协议 XA协议是X/Open的组织定义的分布式事务处理标准规范(DTP)。它定义了全局的事务管理器Transaction Manager用于协调全局事务和局部的资源管理器Resource Manager用于驱动本地事务之间的通讯接口。在TM与多个RM之间形成一个双向通信桥梁从而在多个数据库资源下保证ACID四个特性。目前几乎所有的主流数据库都对XA规范提供了支持。** XA协议包含有几个角色 AP(Application Program) : 既应用程序可以理解为使用DTP分布式事务的程序。RM(Resource Manager) : 即资源管理器可以理解为事务的参与者一般情况下是指一个数据库实例通过资源管理器对该数据库进行控制资源管理器控制着分支事务。TM(Transaction Manager) : 事务管理器负责协调和管理事务事务管理器控制着全局事务管理事务生命周期并协调各个RM。全局事务是指分布式事务处理环境中需要操作多个数据库共同完成一个工作这个工作即是一个全局事务。 2PCTwo-phase commit protocol中文叫二阶段提交。 二阶段提交是一种强一致性设计2PC 引入一个事务协调者(TM)的角色来协调管理各参与者也可称之为各本地资源RM的提交和回滚二阶段分别指的是准备和提交两个阶段。 第一阶段准备阶段 准备阶段事务协调者™会给各事务参与者(RM)发送准备命令(prepare)参与者准备成功后返回(ready) 协调者向所有参与者发送事务操作指令参与者执行除了事务提交外所有操作如参与者执行成功给协调者反馈执行成功否则反馈中止表示事务失败 第二阶段提交阶段 协调者收到各个参与者的准备消息后根据反馈情况通知各个参与者commit提交或者rollback回滚 1commit提交 当第一阶段所有参与者都反馈成功时协调者发起正式提交事务的请求当所有参与者都回复提交成功时则意味着完成事务。 协调者节点向所有参与者发出正式提交的 commit 请求。收到协调者的 commit 请求后参与者正式执行事务提交操作并释放在整个事务期间内占用的资源。参与者完成事务提交后向协调者节点发送已提交消息。协调者节点收到所有参与者节点反馈的已提交消息后完成事务。 2rollback回滚 如果任意一个参与者节点在第一阶段返回的消息为中止(或者异常)或者协调者节点在第一阶段的询问超时无法获取到全部参数者反馈那么这个事务将会被回滚。 协调者向所有参与者发出 rollback 回滚操作的请求 参与者执行事务回滚并释放在整个事务期间内占用的资源 参与者在完成事务回滚之后向协调者发送回滚完成的反馈消息 协调者收到所有参与者反馈的消息后取消事务 优缺点 缺点 性能问题执行过程中所有参与节点都是事务阻塞性的当参与者占有公共资源时其他第三方节点访问公共资源就不得不处于阻塞状态为了数据的一致性而牺牲了可用性对性能影响较大不适合高并发高性能场景 可靠性问题2PC非常依赖协调者当协调者发生故障时尤其是第二阶段那么所有的参与者就会都处于锁定事务资源的状态中而无法继续完成事务操作 数据一致性问题在阶段二中当协调者向参与者发送commit请求之后发生了局部网络异常或者在发送commit请求过程中协调者发生了故障这回导致只有一部分参与者接受到了commit请求。而在这部分参与者接到commit请求之后就会执行commit操作。但是其他部分未接到commit请求的机器则无法执行事务提交。于是整个分布式系统便出现了数据不一致性的现象。 二阶段无法解决的问题协调者在发出 commit 消息之后宕机而唯一接收到这条消息的参与者同时也宕机了那么即使协调者通过选举协议产生了新的协调者这条事务的状态也是不确定的没人知道事务是否被已经提交。 优点 尽量保证了数据的强一致适合对数据强一致要求很高的关键领域。 方案23PC 概念 3PC三阶段提交协议是二阶段提交协议的改进版本以解决2PC存在的缺陷问题, 具体改进如下: 在协调者和参与者中都引入超时机制 引入确认机制当所有参与者能正常工作才执行事务 所以3PC分为3个阶段CanCommit 准备阶段、PreCommit 预提交阶段、DoCommit 提交阶段。 第一阶段CanCommit 准备阶段 协调者向参与者发送 canCommit 请求参与者如果可以提交就返回Yes响应否则返回No响应具体流程如下 事务询问协调者向所有参与者发出包含事务内容的 canCommit 请求询问是否可以提交事务并等待所有参与者答复。 响应反馈参与者收到 canCommit 请求后如果认为可以执行事务操作则反馈 yes 并进入预备状态否则反馈 no。 第二阶段PreCommit 阶段 协调者根据参与者的反应情况来决定是否可以进行事务的 PreCommit 操作。根据响应情况有以下两种可能 执行事务返回都是yes 所有参与者向协调者发送了Yes响应将会执行执行事务 协调者向参与者发送 PreCommit 请求并进入准备阶段参与者接收到 PreCommit 请求后会执行本地事务操作但不提交事务参与者成功的执行了事务操作后返回ACK响应同时开始等待最终指令。 中断事务返回存在no 如果存在一个参与者向协调者发送了No响应或者等待超时之后协调者都没有接到参与者的响应那么就执行事务的中断 协调者向所有参与者发送 abort 请求。参与者收到来自协调者的 abort 请求之后或超时之后仍未收到协调者的请求执行事务的中断。 第三阶段doCommit阶段 该阶段进行真正的事务提交也存在2种情况 提交事务返回都是yes 第二阶段的preCommit 请求所有参与者向协调者发送了Yes响应将会提交事务 协调接收到所有参与者发送的ACK响应那么他将从预提交状态进入到提交状态并向所有参与者发送 doCommit 请求参与者接收到doCommit请求之后执行正式的事务提交并在完成事务提交之后释放所有事务资源事务提交完之后参与者向协调者发送ack响应。协调者接收到所有参与者的ack响应之后完成事务。 中断事务返回存在no 如果存在一个参与者向协调者发送了No响应或者等待超时之后协调者都没有接到参与者的响应那么就执行事务的中断 协调者处向所有参与者发出 abort 请求参与者接收到abort请求之后马上回滚事务释放所有的事务资源。参与者完成事务回滚之后向协调者反馈ACK消息协调者接收到参与者反馈的ACK消息之后执行事务的中断。 优缺点 缺点 数据不一致问题依然存在当在参与者收到 preCommit 请求后等待 doCommit 指令时此时如果协调者请求中断事务而协调者无法与参与者正常通信会导致参与者继续提交事务造成数据不一致。 优点 相比二阶段提交三阶段提交降低了阻塞范围在等待超时后协调者或参与者会中断事务。避免了协调者单点问题阶段 3 中协调者出现问题时参与者会继续提交事务。 方案3TCC 概念 TCCTry Confirm Cancel方案是一种应用层面侵入业务的两阶段提交。是目前最火的一种分布式事务方案其核心思想是针对每个操作都要注册一个与其对应的确认和补偿撤销操作。 怎么理解看下图 一个完整的 TCC 业务由一个主业务服务和若干个从业务服务组成主业务服务发起并完成整个业务活动TCC 模式要求从服务提供三个接口Try、Confirm、Cancel。 Try(尝试) 这个过程并未执行业务只是完成所有业务的一致性检查并预留好执行所需的全部资源 Confirm(确认) 确认执行业务操作不做任何业务检查 只使用Try阶段预留的业务资源。通常情况下采用TCC 则认为 Confirm阶段是不会出错的。即只要Try成功Confirm一定成功。若Confirm阶段真的 出错了需引入重试机制或人工处理。 Cancel(取消) 取消Try阶段预留的业务资源。通常情况下采用TCC则认为Cancel阶段也是一定成功的。若 Cancel阶段真的出错了需引入重试机制或人工处理。 注意要点 并且由于 confirm 或者 cancel 有可能会重试因此对应的部分需要支持幂等。 经典案例 需求用户下单扣库存扣款 假设 库存总数10购买2账户余额1000扣款200 优缺点 优点 性能提升具体业务来实现控制资源锁的粒度变小不会锁定整个资源。数据最终一致性基于 Confirm 和 Cancel 的幂等性保证事务最终完成确认或者取消保证数据的一致性。可靠性解决了 XA 协议的协调者单点故障问题由主业务方发起并控制整个业务活动业务活动管理器也变成多点引入集群 缺点 TCC 的 Try、Confirm 和 Cancel 操作功能要按具体业务来实现业务耦合度较高提高了开发成本。 方案4SAGA 概念 Saga是分布式事务领域最有名气的解决方案之一最初出现在1987年Hector Garcaa-Molrna Kenneth Salem发表的论文SAGAS里。 Saga是由一系列的本地事务构成。每一个本地事务在更新完数据库之后会发布一条消息或者一个事件来触发Saga中的下一个本地事务的执行。如果一个本地事务因为某些业务规则无法满足而失败Saga会执行在这个失败的事务之前成功提交的所有事务的补偿操作。 Saga的实现有很多种方式其中最流行的两种方式是 命令协调Order Orchestrator这种方式的工作形式就像一只乐队由一个指挥家协调中心来协调大家的工作。协调中心来告诉Saga的参与方应该执行哪一个本地事务。 事件编排Event Choreographyo这种方式没有协调中心整个模式的工作方式就像舞蹈一样各个舞蹈演员按照预先编排的动作和走位各自表演最终形成一只舞蹈。处于当前Saga下的各个服务会产生某类事件或者监听其它服务产生的事件并决定是否需要针对监听到的事件做出响应。 命令协调 中央协调器Orchestrator简称 OSO以命令/回复的方式与每项服务进行通信全权负责告诉每个参与者该做什么以及什么时候该做什么。 主业务接口发起事务业务开启订单事务Saga协调器库存服务请求扣减库存库存服务操作后回复处理结果。Saga协调器账户服务请求扣减余额账户服务操作后回复处理结果。处理结果。Saga协调器订单服务请求创建订单订单服务操作后回复主业务逻辑接收并处理Saga协调器事务处理结果回复。 中央协调器 OSO 必须事先知道执行整个事务所需的流程如果有任何失败它还负责通过向每个参与者发送命令来撤销之前的操作来协调分布式的回滚基于中央协调器协调一切时回滚要容易得多因为协调器默认是执行正向流程回滚时只要执行反向流程即可。 执行顺序 A–B–C 回滚顺序 C–B—A 事件编排 在基于事件的方式中第一个服务执行完本地事务之后会产生一个事件。其它服务会监听这个事件触发该服务本地事务的执行并产生新的事件。当最后一个服务执行本地事务并且不发布任何事件时意味着分布式事务结束或者它发布的事件没有被任何 Saga 参与者听到都意味着事务结束。 主业务接口发布下单事件。库存服务监听下单事件扣减库存并发布库存已扣减事件。账户服务监听已扣减库存事件扣减余额并发已扣减余额事件。订单服务监听已扣减余额事件创建订单并发布下单成功事件。主业务逻辑监听下单成功事件后执行后续处理。 异常恢复 前面讲到saga模式在本地事务因为某些业务规则无法满足而失败Saga会执行在这个失败的事务之前成功提交的所有事务的补偿操作。 上面意思可以理解为saga模式下每个事务参与者提供一对接口一个做正常事务操作一个做异常事务回滚操作。比如支付与退款扣款与回补等。 saga支持事务恢复策略 向后恢复(backward recovery) 当执行事务失败时补偿所有已完成的事务是“一退到底”的方式这种做法的效果是撤销掉之前所有成功的子事务使得整个 Saga 的执行结果撤销。 从上图可知事务执行到了支付事务T3但是失败了因此事务回滚需要从C3,C2,C1依次进行回滚补偿对应的执行顺序为T1,T2,T3,C3,C2,C1。 向前恢复(forward recovery) 对于执行不通过的事务会尝试重试事务这里有一个假设就是每个子事务最终都会成功这种方式适用于必须要成功的场景事务失败了重试不需要补偿。 优缺点 命令协调设计 优点 服务之间关系简单避免服务间循环依赖因为 Saga 协调器会调用 Saga 参与者但参与者不会调用协调器。程序开发简单只需要执行命令/回复(其实回复消息也是一种事件消息)降低参与者的复杂性。易维护扩展在添加新步骤时事务复杂性保持线性回滚更容易管理更容易实施和测试。 缺点 中央协调器处理逻辑容易变得庞大复杂导致难以维护。存在协调器单点故障风险。 事件编排设计 优点 避免中央协调器单点故障风险。当涉及的步骤较少服务开发简单容易实现。 缺点 服务之间存在循环依赖的风险。当涉及的步骤较多服务间关系混乱难以追踪调测。 命令VS事件 命令协调方式与事件编排方式2者怎么选择 系统复杂性如果系统的业务逻辑复杂事务需要严格控制和编排命令方式可以提供更好的可见性和可控性。系统扩展性如果系统需要频繁扩展和修改需要一定的灵活性事件方式可以提供解耦和扩展性更好的架构。性能需求如果需要更好的性能和可伸缩性并行执行事务的各个步骤事件方式更适合。异步需求如果系统需要异步处理和解耦事件方式提供了更好的可行性。 方案5本地消息表【扩展】 操作流程 本地消息表的核心思路就是将分布式事务拆分成本地事务进行处理在该方案中主要有两种角色事务主动方和事务被动方。事务主动发起方需要额外新建事务消息表并在本地事务中完成业务处理和记录事务消息并轮询事务消息表的数据发送事务消息事务被动方基于消息中间件消费事务消息表中的事务。 操作步骤 1发生分布式事务操作时 事务主动方在DB1中的操作业务表 记录事务信息在消息表中状态为未处理 2事务主动方向消息中间件推送一个事务操作消息并通知事务被动方处理事务消息。 3事务被动方监控消息中间件读取事务消息完成DB2中业务操作往消息中间件返回ack 4事务主动方监控消息中间件读取事务消息更新消息表状态为已处理 异常情况处理 当1处理出错事务主动方在本地事务中直接回滚就行。 当2处理出错由于DB1中还是保存事务消息可以设置轮询逻辑将消息重新推送给消息中间件在通知事务被动方。 当3处理出错重复获取消息重复执行即可。 如果是业务上处理失败事务被动方可以发消息给事务主动方回滚事务 如果事务被动方已经消费了消息事务主动方需要回滚事务的话需要发消息通知事务主动方进行回滚事务。 优缺点 优点 从应用设计开发的角度实现了消息数据的可靠性消息数据的可靠性不依赖于消息中间件弱化了对 MQ 中间件特性的依赖。方案轻量容易实现。 缺点 与具体的业务场景绑定耦合性强不可公用消息数据与业务数据同库占用业务系统资源业务系统在使用关系型数据库的情况下消息服务性能会受到关系型数据库并发性能的局限 方案6MQ消息事务【扩展】 操作流程 基于MQ的分布式事务方案本质上是对本地消息表的封装整体流程与本地消息表一致唯一不同的就是将本地消息表存在了MQ内部而不是业务数据库中 以RocketMQ为例子 1)事务消息发送及提交 (1) 发送消息(half消息) (2) 服务端响应消息写入结果 (3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见本地逻辑不执行) (4) 根据本地事务状态执行Commit或者Rollback(Commit操作生产消息索引消息对消费者可见) 2) 事务补偿 (1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查” (2) Producer收到回查消息检查回查消息对于的本地事务的状态 (3) 根据本地事务状态重新Commit或者Rollback 其中补偿阶段用户解决消息Commit或者Rollback发生超时或者失效的情况 3) 事务消息状态 事务消息共有三种状态提交状态回查状态中间状态: TransactionStatus.CommitTransaction: 提交事务它允许消费者消费此消息TransactionStatus.RollbackTransaction: 回滚事务它代表消息将被删除不允许被消费TransactionStatus.Unknown: 中间状态它代表需要消息队列来确认状态 优缺点 优点 消息数据独立存储 降低业务系统与消息系统之间的耦合吞吐量大于使用本地消息表方案 缺点 一次消息发送需要两次网络请求(half 消息 commit/rollback 消息) 。业务处理服务需要实现消息状态回查接口。 方案7最大努力通知【扩展】 最大努力通知也称为定期校对是对MQ事务方案的进一步优化。它在事务主动方增加了消息校对的接口如果事务被动方没有接收到主动方发送的消息此时可以调用事务主动方提供的消息校对的接口主动获取 在可靠消息事务中事务主动方需要将消息发送出去并且让接收方成功接收消息这种可靠性发送是由事务主动方保证的但是最大努力通知事务主动方仅仅是尽最大努力重试轮询…将事务发送给事务接收方所以存在事务被动方接收不到消息的情况此时需要事务被动方主动调用事务主动方的消息校对接口查询业务消息并消费这种通知的可靠性是由事务被动方保证的。 方案选择 属性2PC/3PCTCCSaga本地消息表尽最大努力通知(MQ)事务一致性强弱弱弱弱复杂性中高中低低业务侵入性小大小中中使用局限性大大中小中性能低中高高高维护成本低高中低中 2PC/3PC依赖于数据库能够很好的提供强一致性和强事务性但延迟比较高比较适合传统的单体应用在同一个方法中存在跨库操作的情况不适合高并发和高性能要求的场景。TCC适用于执行时间确定且较短实时性要求高对数据一致性要求高比如互联网金融企业最核心的三个服务交易、支付、账务。本地消息表/MQ 事务适用于事务中参与方支持操作幂等对一致性要求不高业务上能容忍数据不一致到一个人工检查周期事务涉及的参与方、参与环节较少业务上有对账/校验系统兜底 性能高。Saga 事务由于 Saga 事务不能保证隔离性需要在业务层控制并发适合于业务场景事务并发操作同一资源较少的情况。Saga 由于缺少预提交动作导致补偿动作的实现比较麻烦例如业务是发送短信补偿动作则得再发送一次短信说明撤销用户体验比较差。所以Saga 事务较适用于补偿动作容易处理的场景 四、分布式事务最佳实践-Seata 简介 官网http://seata.io/zh-cn/ 源码https://github.com/seata/seata Seata 是一款开源的分布式事务解决方案致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式为用户打造一站式的分布式解决方案。 常见术语 TC (Transaction Coordinator) - 事务协调者 维护全局和分支事务的状态驱动全局事务提交或回滚。 TM (Transaction Manager) - 事务管理器 定义全局事务的范围开始全局事务、提交或回滚全局事务。 RM (Resource Manager) - 资源管理器 管理分支事务处理的资源与TC交谈以注册分支事务和报告分支事务的状态并驱动分支事务提交或回滚。 Seata配置 下载 官网https://github.com/seata/seata/releases/tag/v1.7.0 修改配置文件 1.5.0之前的版本配置文件是有多个的都位于conf文件夹下如file.conf,registry,conf等。在1.5.0版本之后都整合到一个配置文件里了即application.yml。以下配置项请按照自己版本查找修改。 以seata-1.7.0为例打开conf/application.yml进行修改重点修改nacos部分配置。 其中的application.example.yml 各种注册中心配置中心配置方式默认是配置本地这里以配置在nacos为例子。 server:port: 7091spring:application:name: seata-serverlogging:config: classpath:logback-spring.xmlfile:path: ${user.home}/logs/seataextend:logstash-appender:destination: 127.0.0.1:4560kafka-appender:bootstrap-servers: 127.0.0.1:9092topic: logback_to_logstashconsole:user:username: seatapassword: seata seata:config:# support: nacos, consul, apollo, zk, etcd3type: nacosnacos:server-addr: 127.0.0.1:8848namespace:group: SEATA_GROUPusername:password:context-path:##if use MSE Nacos with auth, mutex with username/password attribute#access-key:#secret-key:data-id: seataServer.propertiesregistry:# support: nacos, eureka, redis, zk, consul, etcd3, sofatype: nacosnacos:application: seata-serverserver-addr: 127.0.0.1:8848group: SEATA_GROUPnamespace:cluster: defaultusername:password:context-path:##if use MSE Nacos with auth, mutex with username/password attribute#access-key:#secret-key:store:# support: file 、 db 、 redismode: file # server: # service-port: 8091 #If not configured, the default is ${server.port} 1000security:secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017tokenValidityInMilliseconds: 1800000ignore:urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login修改成功后意味着seata将从nacos获取配置信息同时注册自身服务到nacos中心。 nacos配置 上面配置项中有一项seata.config.data-idseataServer.properties意思为要读nacos上的seataServer.properties配置文件接下来去Nacos创建该配置文件注意Group与第2步中的保持一致这里是SEATA_GROUP。 配置内容从seata-server-1.7.0/seata/script/config-center/config.txt粘贴修改而来这里只使用对我们有用的配置主要是数据库配置信息。 #Transaction storage configuration, only for the server. store.modedb store.lock.modedb store.session.modedb#These configurations are required if the store mode is db. store.db.datasourcedruid store.db.dbTypemysql store.db.driverClassNamecom.mysql.cj.jdbc.Driver store.db.urljdbc:mysql://127.0.0.1:3306/seata?useSSLfalseuseUnicodetruerewriteBatchedStatementstrue store.db.userroot store.db.passwordadmin store.db.minConn5 store.db.maxConn30 store.db.globalTableglobal_table store.db.branchTablebranch_table store.db.distributedLockTabledistributed_lock store.db.queryLimit100 store.db.lockTablelock_table store.db.maxWait5000数据库建表 在seata数据库内执行seata-server-1.7.0/seata/script/server/db目录下的sql脚本根据数据库类型创建服务端所需的表。此处选择mysql -- -------------------------------- The script used when storeMode is db -------------------------------- -- the table to store GlobalSession data CREATE TABLE IF NOT EXISTS global_table (xid VARCHAR(128) NOT NULL,transaction_id BIGINT,status TINYINT NOT NULL,application_id VARCHAR(32),transaction_service_group VARCHAR(32),transaction_name VARCHAR(128),timeout INT,begin_time BIGINT,application_data VARCHAR(2000),gmt_create DATETIME,gmt_modified DATETIME,PRIMARY KEY (xid),KEY idx_status_gmt_modified (status , gmt_modified),KEY idx_transaction_id (transaction_id) ) ENGINE InnoDBDEFAULT CHARSET utf8mb4;-- the table to store BranchSession data CREATE TABLE IF NOT EXISTS branch_table (branch_id BIGINT NOT NULL,xid VARCHAR(128) NOT NULL,transaction_id BIGINT,resource_group_id VARCHAR(32),resource_id VARCHAR(256),branch_type VARCHAR(8),status TINYINT,client_id VARCHAR(64),application_data VARCHAR(2000),gmt_create DATETIME(6),gmt_modified DATETIME(6),PRIMARY KEY (branch_id),KEY idx_xid (xid) ) ENGINE InnoDBDEFAULT CHARSET utf8mb4;-- the table to store lock data CREATE TABLE IF NOT EXISTS lock_table (row_key VARCHAR(128) NOT NULL,xid VARCHAR(128),transaction_id BIGINT,branch_id BIGINT NOT NULL,resource_id VARCHAR(256),table_name VARCHAR(32),pk VARCHAR(36),status TINYINT NOT NULL DEFAULT 0 COMMENT 0:locked ,1:rollbacking,gmt_create DATETIME,gmt_modified DATETIME,PRIMARY KEY (row_key),KEY idx_status (status),KEY idx_branch_id (branch_id),KEY idx_xid (xid) ) ENGINE InnoDBDEFAULT CHARSET utf8mb4;CREATE TABLE IF NOT EXISTS distributed_lock (lock_key CHAR(20) NOT NULL,lock_value VARCHAR(20) NOT NULL,expire BIGINT,primary key (lock_key) ) ENGINE InnoDBDEFAULT CHARSET utf8mb4;INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES (AsyncCommitting, , 0); INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES (RetryCommitting, , 0); INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES (RetryRollbacking, , 0); INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES (TxTimeoutCheck, , 0);启动seata-server 运行bin下的bat脚本启动服务。 访问http://127.0.0.1:7091 默认账号与密码都是seata Seata模式使用 XA模式 概念 XA规范是X/Open组织定义的分布式事务处理(DTP)标准XA规范描述了全局的TM与局部RM之间的接口几乎所有主流的数据库都对XA规范提供了支持。 原理 XA是一种分布式事务协议用于在分布式环境下实现事务的一致性。它通过将本地事务Local Transaction与全局事务Global Transaction进行协调来实现分布式事务。 XA模式使用了一组标准的API如Java中的javax.transaction包通常包括两个阶段事务准备Prepare和事务提交Commit。 思考XA与2PC有啥区别 注意点XA模式可以使用2PC协议来实现全局事务的一致性。也就是说XA模式是2PC的一种具体实现方式并且在XA模式中可以利用其他机制如XA事务日志来增强事务的持久性和可靠性。因此可以将XA视为2PC的一个变种或扩展。 seata 的XA模式做了一些调整 操作流程: 第一阶段 1注册全局事务 2调用RM事务接口注册分支事务 3执行RM事务操作不提交 4往TC报告事务状态 第二阶段 1所有RM执行完本地事务TM发起全局事务提交/回滚 2TC检查所有RM事务状态yes or no? ​ 全部yes通知所有RM提交事务 ​ 存在no通知所有RM回滚事务 代码 项目集成seata 依赖 所有微服务导入seata依赖 !-- 注意一定要引入对版本要引入spring-cloud版本seata而不是springboot版本的seata-- dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-seata/artifactId!-- 排除掉springcloud默认的seata版本以免版本不一致出现问题--exclusionsexclusiongroupIdio.seata/groupIdartifactIdseata-spring-boot-starter/artifactId/exclusionexclusiongroupIdio.seata/groupIdartifactIdseata-all/artifactId/exclusion/exclusions /dependency dependencygroupIdio.seata/groupIdartifactIdseata-spring-boot-starter/artifactIdversion1.7.0/version /dependency配置文件 在application.yml文件中配置 每个微服务都要 #seata客户端配置 seata:enabled: trueapplication-id: seata_txtx-service-group: seata_tx_groupservice:vgroup-mapping:seata_tx_group: defaultregistry:type: nacosnacos:application: seata-serverserver-addr: 127.0.0.1:8848namespace:group: SEATA_GROUPdata-source-proxy-mode: XA 其中seata_tx_group为我们自定义的事务组名字随便起但是下面service.vgroup-mapping下一定要有一个对应这个名字的映射映射到defaultseata默认的集群名称。 nacos方面我们仅配置注册项即registry下的配置配置内容与服务端保持一致。 配置全局事务 在business-service服务的purchase 方法中加上全局事务标签GlobalTransactional Override GlobalTransactional public void purchase(String userId, String commodityCode, int orderCount, boolean rollback) {String result stockFeignClient.deduct(commodityCode, orderCount);if (!SUCCESS.equals(result)) {throw new RuntimeException(库存服务调用失败,事务回滚!);}result orderFeignClient.create(userId, commodityCode, orderCount);if (!SUCCESS.equals(result)) {throw new RuntimeException(订单服务调用失败,事务回滚!);}if (rollback) {throw new RuntimeException(Force rollback ... );} }启动seata 测试 正常http://localhost:8088/businesses/purchase?rollbackfalsecount2 超库存http://localhost:8088/businesses/purchase?rollbackfalsecount12 超余额http://localhost:8088/businesses/purchase?rollbackfalsecount8 优缺点 优点 事务强一致性满足ACID原则常用的数据库都支持实现简单并且没有代码侵入 缺点 第一阶段锁定数据库资源等待二阶段结束才释放锁定资源过长性能较差依赖关系型数据库的实现事务 AT模式 概念 AT模式同样是分阶段提交事务模式操作起来算是XA模式的优化版。 XA模式在第一阶段存在锁定资源的操作时间长之后会影响性能。 AT模式在第一阶段直接提交事务弥补了XA模式中资源锁定周期过长缺陷。 原理 操作流程: 第一阶段 1注册全局事务 2调用RM事务接口注册分支事务 3执行RM事务操作并提交记录undo log日志快照 4往TC报告事务状态 第二阶段 1所有RM执行完本地事务TM发起全局事务提交/回滚 2TC检查所有RM事务状态yes or no? ​ 全部yes通知所有RM提交事务删除undo log日志快照 ​ 存在no通知所有RM回滚事务恢复undo log日志快照 XA vs AT XA模式一阶段不提交事务锁定资源 AT模式一阶段直接提交不锁定资源XA模式依赖数据库实现回滚 AT利用数据快照实现数据回顾XA模式强一致性AT模式最终一致(一阶段提交此时有事务查询就存在不一致) 问题 AT模式因为在全局事务中第一阶段就提交了事务释放资源。如果这个时另外RM/外部事务(非RM)操作相同资源可能存在读写隔离问题(更新丢失问题)。 问题出现原理 读写隔离问题-2个seata事务解决方案 加全局锁 读写隔离问题-1个seata事务 非seata事务解决方案 全局锁多级快照 代码 配置seata-AT相关快照/全局锁/快照表数据库 配置数据库 TM数据库-seata 源sql seata-server-1.7.0/script/server/db 中 各个RM数据库 源sql https://seata.io/zh-cn/docs/dev/mode/at-mode.html CREATE TABLE undo_log (id bigint(20) NOT NULL AUTO_INCREMENT,branch_id bigint(20) NOT NULL,xid varchar(100) NOT NULL,context varchar(128) NOT NULL,rollback_info longblob NOT NULL,log_status int(11) NOT NULL,log_created datetime NOT NULL,log_modified datetime NOT NULL,ext varchar(100) DEFAULT NULL,PRIMARY KEY (id),UNIQUE KEY ux_undo_log (xid,branch_id) ) ENGINEInnoDB AUTO_INCREMENT1 DEFAULT CHARSETutf8; 配置文件 在application.yml文件中配置 每个微服务都要 #seata客户端配置 seata:enabled: trueapplication-id: seata_txtx-service-group: seata_tx_groupservice:vgroup-mapping:seata_tx_group: defaultregistry:type: nacosnacos:application: seata-serverserver-addr: 127.0.0.1:8848namespace:group: SEATA_GROUPdata-source-proxy-mode: AT测试 正常http://localhost:8088/businesses/purchase?rollbackfalsecount2 超库存http://localhost:8088/businesses/purchase?rollbackfalsecount12 超余额http://localhost:8088/businesses/purchase?rollbackfalsecount8 优缺点 优点 一阶段完成直接提交事务释放资源性能较好利用全局锁实现读写隔离没有代码侵入框架自动完成回滚与提交 缺点 两阶段之间存在数据不一致情况只能保证最终一致框架的快照功能影响性能但比XA模式要好很多 TCC模式 概念 TCC模式的seata版实现。TCC模式与AT模式非常相似每阶段都是独立事务不同的TCC通过人工编码来实现数据恢复。 Try资源的检测和预留Confirm完成资源操作业务要求Try 成功Confirm 一定要能成功Cancel预留资源释放可以理解为try的反向操作 原理 操作流程: 1注册全局事务 2调用RM事务接口注册分支事务 3执行RM事务try接口检查资源预留资源 4往TC报告事务状态 5所有RM执行完本地事务TM发起全局事务提交/回滚 2TC检查所有RM事务状态yes or no? ​ 全部yes通知所有RM 执行confirm接口提交事务 ​ 存在no通知所有RM 执行cancel接口回滚事务 案例演示 问题 TCC模式中在执行Try执行Confirm执行Cancel 过程中会出现意外情况导致TCC模式经典问题空回滚业务悬挂重试幂等问题。 空回滚 当某个分支事务try阶段阻塞时可能导致全局事务超时而触发二阶段的cancel操作RM在没有执行try操作就执行cancel操作此时cancel无数据回滚这就是空回滚。 业务悬挂 当发生的空回滚之后当阻塞的Try正常了RM先执行空回滚(cancel)后又收到Try操作指令执行业务操作并冻结资源。但是事务已经结束不会再有confirm 或cancel了那刚执行try操作冻结资源就被悬挂起来了。这就是业务悬挂 重试幂等 因为网络抖动等原因TC下达的Confirm/Cancel 指令可能出现延时发送失败等问题此时TC会启用重试机制到时RM可能收到多个confirm或cancel指令这就要求confirm接口或者cancel接口需要能够保证幂等性。 幂等性多次执行结果都一样 上面空回滚/业务悬挂问题解决一般都一起实现引入事务状态控制表 表字段 xid冻结数据事务状态(try、confirm/cancel) 以RM: account-service 中用户账户余额为例子。 try1在状态表中记录冻结金额与事务状态为try2扣减账户余额 confirm1根据xid删除状态表中冻结记录 cancel1修改状态表冻结金额为0事务状态改为cancel 2恢复账户扣减 如何判断是否为空回滚在cancel中根据xid查询状态表如果为不存在说明try执行需要空回滚 如果避免业务悬挂try业务中根据xid查询状态表如果已经存在说明已经执行过cancel已经执行过拒绝执行try业务。 重试幂等需要引入唯一标识比如第一次操作成功留下唯一标识下次来识别这个标识。 代码 在AT模式基础上做代码TCC改造就行。 account-service服务 新增IAccountTCCService接口与实现 package cn.xxx.tx.account.service;import io.seata.rm.tcc.api.BusinessActionContext; import io.seata.rm.tcc.api.BusinessActionContextParameter; import io.seata.rm.tcc.api.LocalTCC; import io.seata.rm.tcc.api.TwoPhaseBusinessAction;/*** TCC 二阶段提交业务接口*/ LocalTCC public interface IAccountTCCService {/*** try-预扣款*/TwoPhaseBusinessAction(nametryReduce, commitMethod confirm, rollbackMethod cancel)void tryReduce(BusinessActionContextParameter(paramName userId) String userId,BusinessActionContextParameter(paramName money) int money);/*** confirm-提交* param ctx* return*/boolean confirm(BusinessActionContext ctx);/*** cancel-回滚* param ctx* return*/boolean cancel(BusinessActionContext ctx); } package cn.xxx.tx.account.service.impl;import cn.xxx.tx.account.domain.Account; import cn.xxx.tx.account.mapper.AccountMapper; import cn.xxx.tx.account.service.IAccountService; import cn.xxx.tx.account.service.IAccountTCCService; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import io.seata.rm.tcc.api.BusinessActionContext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional;import java.awt.*;Service public class AccountTCCServiceImpl implements IAccountTCCService {Autowiredprivate AccountMapper accountMapper;Overridepublic void tryReduce(String userId, int money) {System.err.println(-----------tryReduce-------------);Account one accountMapper.selectOne(new LambdaQueryWrapperAccount().eq(Account::getUserId, userId));if(one ! null one.getMoney() money){throw new RuntimeException(Not Enough Money ...);}LambdaUpdateWrapperAccount wrapper new LambdaUpdateWrapper();wrapper.setSql(money money - money);wrapper.eq(Account::getUserId, userId);accountMapper.update(null, wrapper);}Overridepublic boolean confirm(BusinessActionContext ctx) {System.err.println(-----------confirm-------------);return true;}Overridepublic boolean cancel(BusinessActionContext ctx) {System.err.println(-----------cancel-------------);return true;} } controller改动 package cn.xxx.tx.account.controller;import cn.xxx.tx.account.domain.Account; import cn.xxx.tx.account.service.IAccountService; import cn.xxx.tx.account.service.IAccountTCCService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*;RestController RequestMapping(accounts) public class AccountController {//Autowired//private IAccountService accountService;Autowiredprivate IAccountTCCService accountTCCService;GetMapping(value /reduce)public String reduce(String userId, int money) {try {accountTCCService.tryReduce(userId, money);} catch (Exception exx) {exx.printStackTrace();return FAIL;}return SUCCESS;}} order-service服务 新增IOrderTCCService接口与实现 package cn.xxx.tx.order.service;import cn.xxx.tx.order.domain.Order; import com.baomidou.mybatisplus.extension.service.IService; import io.seata.rm.tcc.api.BusinessActionContext; import io.seata.rm.tcc.api.BusinessActionContextParameter; import io.seata.rm.tcc.api.LocalTCC; import io.seata.rm.tcc.api.TwoPhaseBusinessAction;/*** TCC 二阶段提交业务接口*/ LocalTCC public interface IOrderTCCService {/*** try-预扣款*/TwoPhaseBusinessAction(nametryCreate, commitMethod confirm, rollbackMethod cancel)void tryCreate(BusinessActionContextParameter(paramName userId) String userId,BusinessActionContextParameter(paramName commodityCode) String commodityCode,BusinessActionContextParameter(paramName orderCount) int orderCount);/*** confirm-提交* param ctx* return*/boolean confirm(BusinessActionContext ctx);/*** cancel-回滚* param ctx* return*/boolean cancel(BusinessActionContext ctx);} package cn.xxx.tx.order.service.impl;import cn.xxx.tx.order.domain.Order; import cn.xxx.tx.order.feign.AccountFeignClient; import cn.xxx.tx.order.mapper.OrderMapper; import cn.xxx.tx.order.service.IOrderService; import cn.xxx.tx.order.service.IOrderTCCService; import com.alibaba.nacos.shaded.org.checkerframework.checker.units.qual.A; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import io.seata.rm.tcc.api.BusinessActionContext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional;Service public class OrderTCCServiceImpl implements IOrderTCCService {Autowiredprivate AccountFeignClient accountFeignClient;Autowiredprivate OrderMapper orderMapper;Overridepublic void tryCreate(String userId, String commodityCode, int count) {System.err.println(---------tryCreate-----------);// 定单总价 订购数量(count) * 商品单价(100)int orderMoney count * 100;// 生成订单Order order new Order();order.setCount(count);order.setCommodityCode(commodityCode);order.setUserId(userId);order.setMoney(orderMoney);orderMapper.insert(order);// 调用账户余额扣减String result accountFeignClient.reduce(userId, orderMoney);if (!SUCCESS.equals(result)) {throw new RuntimeException(Failed to call Account Service. );}}Overridepublic boolean confirm(BusinessActionContext ctx) {System.err.println(---------confirm-----------);return true;}Overridepublic boolean cancel(BusinessActionContext ctx) {System.err.println(---------cancel-----------);return true;} } controller改动 package cn.xxx.tx.order.controller;import cn.xxx.tx.order.domain.Order; import cn.xxx.tx.order.service.IOrderService; import cn.xxx.tx.order.service.IOrderTCCService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*;RestController RequestMapping(orders) public class OrderController {Autowiredprivate IOrderTCCService orderTCCService;GetMapping(value /create)public String create(String userId, String commodityCode, int orderCount) {try {orderTCCService.tryCreate(userId, commodityCode, orderCount);} catch (Exception exx) {exx.printStackTrace();return FAIL;}return SUCCESS;} } stock-service服务 新增IStockTCCService接口与实现 package cn.xxx.tx.stock.service;import cn.xxx.tx.stock.domain.Stock; import com.baomidou.mybatisplus.extension.service.IService; import io.seata.rm.tcc.api.BusinessActionContext; import io.seata.rm.tcc.api.BusinessActionContextParameter; import io.seata.rm.tcc.api.LocalTCC; import io.seata.rm.tcc.api.TwoPhaseBusinessAction;/*** TCC 二阶段提交业务接口*/ LocalTCC public interface IStockTCCService {/*** try-预扣款*/TwoPhaseBusinessAction(nametryDeduct, commitMethod confirm, rollbackMethod cancel)void tryDeduct(BusinessActionContextParameter(paramName commodityCode) String commodityCode,BusinessActionContextParameter(paramName count) int count);/*** confirm-提交* param ctx* return*/boolean confirm(BusinessActionContext ctx);/*** cancel-回滚* param ctx* return*/boolean cancel(BusinessActionContext ctx); } package cn.xxx.tx.stock.service.impl;import cn.xxx.tx.stock.domain.Stock; import cn.xxx.tx.stock.mapper.StockMapper; import cn.xxx.tx.stock.service.IStockService; import cn.xxx.tx.stock.service.IStockTCCService; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import io.seata.rm.tcc.api.BusinessActionContext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional;Service public class StockTCCServiceImpl implements IStockTCCService {Autowiredprivate StockMapper stockMapper;Overridepublic void tryDeduct(String commodityCode, int count) {System.err.println(---------tryDeduct-----------);Stock one stockMapper.selectOne(new LambdaQueryWrapperStock().eq(Stock::getCommodityCode, commodityCode));if(one ! null one.getCount() count){throw new RuntimeException(Not Enough Count ...);}stockMapper.update(null, new LambdaUpdateWrapperStock().setSql(count count- count).eq(Stock::getCommodityCode, commodityCode));}Overridepublic boolean confirm(BusinessActionContext ctx) {System.err.println(---------confirm-----------);return true;}Overridepublic boolean cancel(BusinessActionContext ctx) {System.err.println(---------cancel-----------);return true;} } controller改动 package cn.xxx.tx.stock.controller;import cn.xxx.tx.stock.service.IStockService; import cn.xxx.tx.stock.service.IStockTCCService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*;RestController RequestMapping(stocks) public class StockController {Autowiredprivate IStockTCCService stockTCCService;GetMapping(value /deduct)public String deduct(String commodityCode, int count) {try {stockTCCService.tryDeduct(commodityCode, count);} catch (Exception exx) {exx.printStackTrace();return FAIL;}return SUCCESS;} } 上面操作在理想情况下是没有问题的但是一旦出现需要回滚操作就出问题了无法进行数据回补。此时就需要使用到事务状态表实现数据回补同时实现空回滚避免业务悬挂。 account-service 在seata-account 新增事务状态表 CREATE TABLE t_account_tx (id bigint NOT NULL AUTO_INCREMENT COMMENT 主键,tx_id varchar(100) NOT NULL COMMENT 事务id,freeze_money int DEFAULT NULL COMMENT 冻结金额,state int DEFAULT NULL COMMENT 状态 0try 1confirm 2cancel,PRIMARY KEY (id) ) ENGINEInnoDB DEFAULT CHARSETutf8;新增domainAccountTX Data TableName(t_account_tx) public class AccountTX {public static final int STATE_TRY 0;public static final int STATE_CONFIRM 1;public static final int STATE_CANCEL 2;TableId(type IdType.AUTO)private Integer id;private String txId;private int freezeMoney;private int state STATE_TRY; } 新增mapperAccountTXMapper package cn.xxx.tx.account.mapper;import cn.xxx.tx.account.domain.AccountTX; import com.baomidou.mybatisplus.core.mapper.BaseMapper;public interface AccountTXMapper extends BaseMapperAccountTX { }修改AccountTCCServiceImpl package cn.xxx.tx.account.service.impl;import cn.xxx.tx.account.domain.Account; import cn.xxx.tx.account.domain.AccountTX; import cn.xxx.tx.account.mapper.AccountMapper; import cn.xxx.tx.account.mapper.AccountTXMapper; import cn.xxx.tx.account.service.IAccountService; import cn.xxx.tx.account.service.IAccountTCCService; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import io.seata.core.context.RootContext; import io.seata.rm.tcc.api.BusinessActionContext; import org.checkerframework.checker.units.qual.A; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional;import java.awt.*;Service public class AccountTCCServiceImpl implements IAccountTCCService {Autowiredprivate AccountMapper accountMapper;Autowiredprivate AccountTXMapper accountTXMapper;Overridepublic void tryReduce(String userId, int money) {System.err.println(-----------tryReduce------------- RootContext.getXID());//业务悬挂AccountTX accountTX accountTXMapper.selectOne(new LambdaQueryWrapperAccountTX().eq(AccountTX::getTxId, RootContext.getXID()));if (accountTX ! null){//存在说明已经canel执行过类拒绝服务return;}Account one accountMapper.selectOne(new LambdaQueryWrapperAccount().eq(Account::getUserId, userId));if(one ! null one.getMoney() money){throw new RuntimeException(Not Enough Money ...);}LambdaUpdateWrapperAccount wrapper new LambdaUpdateWrapper();wrapper.setSql(money money - money);wrapper.eq(Account::getUserId, userId);accountMapper.update(null, wrapper);AccountTX tx new AccountTX();tx.setFreezeMoney(money);tx.setTxId(RootContext.getXID());tx.setState(AccountTX.STATE_TRY);accountTXMapper.insert(tx);}Overridepublic boolean confirm(BusinessActionContext ctx) {System.err.println(-----------confirm-------------);//删除记录int ret accountTXMapper.delete(new LambdaQueryWrapperAccountTX().eq(AccountTX::getTxId, ctx.getXid()));return ret 1;}Overridepublic boolean cancel(BusinessActionContext ctx) {System.err.println(-----------cancel-------------);String userId ctx.getActionContext(userId).toString();String money ctx.getActionContext(money).toString();AccountTX accountTX accountTXMapper.selectOne(new LambdaQueryWrapperAccountTX().eq(AccountTX::getTxId, ctx.getXid()));if (accountTX null){//为空 空回滚accountTX new AccountTX();accountTX.setTxId(ctx.getXid());accountTX.setState(AccountTX.STATE_CANCEL);if(money ! null){accountTX.setFreezeMoney(Integer.parseInt(money));}accountTXMapper.insert(accountTX);return true;}//幂等处理if(accountTX.getState() AccountTX.STATE_CANCEL){return true;}//恢复余额accountMapper.update(null, new LambdaUpdateWrapperAccount().setSql(money money money).eq(Account::getUserId, userId));accountTX.setFreezeMoney(0);accountTX.setState(AccountTX.STATE_CANCEL);int ret accountTXMapper.updateById(accountTX);return ret 1;} } order-service 在seata-order 新增事务状态表 CREATE TABLE t_order_tx (id bigint NOT NULL AUTO_INCREMENT COMMENT 主键,tx_id varchar(100) NOT NULL COMMENT 事务id,state int DEFAULT NULL COMMENT 状态 0try 1confirm 2cancel,PRIMARY KEY (id) ) ENGINEInnoDB DEFAULT CHARSETutf8;新增domainOrderTX package cn.xxx.tx.order.domain;import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data;Data TableName(t_order_tx) public class OrderTX {public static final int STATE_TRY 0;public static final int STATE_CONFIRM 1;public static final int STATE_CANCEL 2;TableId(type IdType.AUTO)private Integer id;private String txId;private int state STATE_TRY; } 新增mapperOrderTXMapper package cn.xxx.tx.order.mapper;import cn.xxx.tx.order.domain.OrderTX; import com.baomidou.mybatisplus.core.mapper.BaseMapper;public interface OrderTXMapper extends BaseMapperOrderTX { } 修改OrderTCCServiceImpl package cn.xxx.tx.order.service.impl;import cn.xxx.tx.order.domain.Order; import cn.xxx.tx.order.domain.OrderTX; import cn.xxx.tx.order.feign.AccountFeignClient; import cn.xxx.tx.order.mapper.OrderMapper; import cn.xxx.tx.order.mapper.OrderTXMapper; import cn.xxx.tx.order.service.IOrderService; import cn.xxx.tx.order.service.IOrderTCCService; import com.alibaba.nacos.shaded.org.checkerframework.checker.units.qual.A; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import io.seata.core.context.RootContext; import io.seata.rm.tcc.api.BusinessActionContext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional;Service public class OrderTCCServiceImpl implements IOrderTCCService {Autowiredprivate AccountFeignClient accountFeignClient;Autowiredprivate OrderMapper orderMapper;Autowiredprivate OrderTXMapper orderTXMapper;Overridepublic void tryCreate(String userId, String commodityCode, int count) {System.err.println(---------tryCreate-----------);//业务悬挂OrderTX orderTX orderTXMapper.selectOne(new LambdaQueryWrapperOrderTX().eq(OrderTX::getTxId, RootContext.getXID()));if (orderTX ! null){//存在说明已经canel执行过类拒绝服务return;}// 定单总价 订购数量(count) * 商品单价(100)int orderMoney count * 100;// 生成订单Order order new Order();order.setCount(count);order.setCommodityCode(commodityCode);order.setUserId(userId);order.setMoney(orderMoney);orderMapper.insert(order);OrderTX tx new OrderTX();tx.setTxId(RootContext.getXID());tx.setState(OrderTX.STATE_TRY);orderTXMapper.insert(tx);// 调用账户余额扣减String result accountFeignClient.reduce(userId, orderMoney);if (!SUCCESS.equals(result)) {throw new RuntimeException(Failed to call Account Service. );}}Overridepublic boolean confirm(BusinessActionContext ctx) {System.err.println(---------confirm-----------);//删除记录int ret orderTXMapper.delete(new LambdaQueryWrapperOrderTX().eq(OrderTX::getTxId, ctx.getXid()));return ret 1;}Overridepublic boolean cancel(BusinessActionContext ctx) {System.err.println(---------cancel----------- );String userId ctx.getActionContext(userId).toString();String commodityCode ctx.getActionContext(commodityCode).toString();OrderTX orderTX orderTXMapper.selectOne(new LambdaQueryWrapperOrderTX().eq(OrderTX::getTxId, ctx.getXid()));if (orderTX null){//为空 空回滚orderTX new OrderTX();orderTX.setTxId(ctx.getXid());orderTX.setState(OrderTX.STATE_CANCEL);orderTXMapper.insert(orderTX);return true;}//幂等处理if(orderTX.getState() OrderTX.STATE_CANCEL){return true;}//恢复余额orderMapper.delete(new LambdaQueryWrapperOrder().eq(Order::getUserId, userId).eq(Order::getCommodityCode, commodityCode));orderTX.setState(OrderTX.STATE_CANCEL);int ret orderTXMapper.updateById(orderTX);return ret 1;} } stock-service 在seata-stock 新增事务状态表 CREATE TABLE t_stock_tx (id bigint NOT NULL AUTO_INCREMENT COMMENT 主键,tx_id varchar(100) NOT NULL COMMENT 事务id,count int DEFAULT NULL COMMENT 冻结库存,state int DEFAULT NULL COMMENT 状态 0try 1confirm 2cancel,PRIMARY KEY (id) ) ENGINEInnoDB DEFAULT CHARSETutf8;新增domainStockTX package cn.xxx.tx.stock.domain;import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data;Data TableName(t_stock_tx) public class StockTX {public static final int STATE_TRY 0;public static final int STATE_CONFIRM 1;public static final int STATE_CANCEL 2;TableId(type IdType.AUTO)private Integer id;private String txId;private int count;private int state STATE_TRY; } 新增mapperStockTXMapper package cn.xxx.tx.stock.mapper;import cn.xxx.tx.stock.domain.StockTX; import com.baomidou.mybatisplus.core.mapper.BaseMapper;public interface StockTXMapper extends BaseMapperStockTX { } 修改StockTCCServiceImpl package cn.xxx.tx.stock.service.impl;import cn.xxx.tx.stock.domain.Stock; import cn.xxx.tx.stock.domain.StockTX; import cn.xxx.tx.stock.mapper.StockMapper; import cn.xxx.tx.stock.mapper.StockTXMapper; import cn.xxx.tx.stock.service.IStockService; import cn.xxx.tx.stock.service.IStockTCCService; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import io.seata.core.context.RootContext; import io.seata.rm.tcc.api.BusinessActionContext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional;Service public class StockTCCServiceImpl implements IStockTCCService {Autowiredprivate StockMapper stockMapper;Autowiredprivate StockTXMapper stockTXMapper;Overridepublic void tryDeduct(String commodityCode, int count) {System.err.println(---------tryDeduct-----------);//业务悬挂StockTX stockTX stockTXMapper.selectOne(new LambdaQueryWrapperStockTX().eq(StockTX::getTxId, RootContext.getXID()));if (stockTX ! null){//存在说明已经canel执行过类拒绝服务return;}Stock one stockMapper.selectOne(new LambdaQueryWrapperStock().eq(Stock::getCommodityCode, commodityCode));if(one ! null one.getCount() count){throw new RuntimeException(Not Enough Count ...);}stockMapper.update(null, new LambdaUpdateWrapperStock().setSql(count count- count).eq(Stock::getCommodityCode, commodityCode));StockTX tx new StockTX();tx.setCount(count);tx.setTxId(RootContext.getXID());tx.setState(StockTX.STATE_TRY);stockTXMapper.insert(tx);}Overridepublic boolean confirm(BusinessActionContext ctx) {System.err.println(---------confirm-----------);//删除记录int ret stockTXMapper.delete(new LambdaQueryWrapperStockTX().eq(StockTX::getTxId, ctx.getXid()));return ret 1;}Overridepublic boolean cancel(BusinessActionContext ctx) {System.err.println(---------cancel-----------);String count ctx.getActionContext(count).toString();String commodityCode ctx.getActionContext(commodityCode).toString();StockTX stockTX stockTXMapper.selectOne(new LambdaQueryWrapperStockTX().eq(StockTX::getTxId, ctx.getXid()));if (stockTX null){//为空 空回滚stockTX new StockTX();stockTX.setTxId(ctx.getXid());stockTX.setState(StockTX.STATE_CANCEL);if(count ! null){stockTX.setCount(Integer.parseInt(count));}stockTXMapper.insert(stockTX);return true;}//幂等处理if(stockTX.getState() StockTX.STATE_CANCEL){return true;}//恢复余额stockMapper.update(null, new LambdaUpdateWrapperStock().setSql(count count count).eq(Stock::getCommodityCode, commodityCode));stockTX.setCount(0);stockTX.setState(StockTX.STATE_CANCEL);int ret stockTXMapper.updateById(stockTX);return ret 1;} } 测试 正常http://localhost:8088/businesses/purchase?rollbackfalsecount2 超库存http://localhost:8088/businesses/purchase?rollbackfalsecount12 超余额http://localhost:8088/businesses/purchase?rollbackfalsecount8 优缺点 优点 一阶段完成直接提交事务释放数据库资源性能好相比AT模型无需生成快照无需使用全局锁性能最强不依赖数据库事务而是依赖补偿操作可以用于非事务性数据库 缺点 代码侵入需要认为编写tryconfirm和cancel接口麻烦没提交/回滚事务前数据是不一致的事务属于最终一致需要考虑confirm 和cancel失败情况要做好幂等处理 SAGA模式[扩展] 概述 Saga模式是SEATA提供的长事务解决方案在Saga模式中业务流程中每个参与者都提交本地事务当出现某一个参与者失败则补偿前面已经成功的参与者一阶段正向服务和二阶段补偿服务都由业务开发实现。 简单理解 saga模式也分为2个阶段 一阶段 直接提交本地事务(所有RM) 二阶段一阶段成功了啥都不做如果存在某个RM本地事务失败则编写补偿业务(反向操作)来实现回滚 原理 左边是所有参与者事务右边是补偿反向操作 正常执行顺序 T1–T2–T3–TN 需要回滚执行顺序T1–T2–T3–TN—回滚—TN—T3—T2—T1 优缺点 优点 事务参与者可以居于事件驱动实现异步调用吞吐量高一阶段直接提交事务无锁性能好不用编写TCC中的三哥阶段实现简单 缺点 一阶段到二阶段时间不定时效性差没有锁没有事务隔离会有脏写可能 模式选择 XAATTCCSAGA一致性强一致弱一致弱一致最终一致隔离性完全隔离基于全局锁基于资源预留隔离无隔离代码侵入无无有编写3个接口有编写状态机和补偿业务性能差好非常好非常好场景对一致性、隔离性有高要求的业务居于关系型数据库的大部分分布式事务场景都可以对性能要求较高的事务有非关系型数据参与的事务业务流程长业务流程多参与者包含其他公司或者遗留系统服务无法提供TCC模式要求的是3个接口
http://www.zqtcl.cn/news/437141/

相关文章:

  • 基于php的网站开发流程图如何建设一个公众号电影网站
  • 2018年怎么做网站排名如何提升网站的收录量
  • 租电信服务器开网站为何要屏蔽网站快照
  • 广州建设网站技术企业咨询属于什么行业
  • 哪些网站容易做网站开发价格
  • 展览网站源码棋牌游戏软件开发
  • 网站开发业务ppt做网站如何放入图像
  • 专业做网站和小程序车载网络设计是干什么的
  • 运城网站建设兼职建设通网站武义巨合汪志刚
  • 广州网站建设公司排行个人介绍网页设计模板图片
  • 东莞营销网站建设多少钱wordpress开场动画
  • 网站建设问题及解决办法网站优化过度的表现
  • html5手机网站教程合肥企业网站营销电话
  • 公司网站引导页建设银行网站怎么登录密码忘了怎么办
  • iis7 网站打不开做兼职哪个网站好
  • 惠州网站制作网站iot物联网平台开发
  • 龙岩门户网站最新仿58同城网站源码
  • 简单的企业小网站南宁最新消息今天
  • 美橙表业手表网站公司推广渠道
  • 大连网站排名优化价格wordpress锚文字
  • 漯河网做网站南京市建设工程档案馆网站
  • 重庆可以建建网站的平台天眼查 企业查询官网
  • gta5单机买房子网站在建设免费建小程序网站
  • 怎么制作网站设计图片劳动保障局瓯海劳务市场和做网站
  • 视屏网站制作青岛平台网站建设
  • asp网站做搜索义乌网站建设工作室
  • .net网站开发环境wordpress添加特效
  • 常州 网站制作如何找专业的网站建设公司
  • 陕西网络营销优化公司seo搜索价格
  • 山东通信局报备网站东营城镇建设规划网站