当前位置: 首页 > news >正文

制作公司内部网站深圳酒店网站建设

制作公司内部网站,深圳酒店网站建设,建公司网站一般多少钱,深圳优化公司高粱seo较简介#xff1a; 在柔性事务的多种实现中#xff0c;事务消息是最为优雅易用的一种。基于阿里云RocketMQ高性能、高可用的特点#xff0c;完全可以胜任抢购业务这类高并发大流量的场景。但引入事务消息机制在实现高性能的同时#xff0c;也增加了整体的业务复杂度。我们需要… 简介 在柔性事务的多种实现中事务消息是最为优雅易用的一种。基于阿里云RocketMQ高性能、高可用的特点完全可以胜任抢购业务这类高并发大流量的场景。但引入事务消息机制在实现高性能的同时也增加了整体的业务复杂度。我们需要对业务场景进行充分评估选择与自身业务特点最为匹配的一种才能更好地发挥柔性事务的价值。 前言 在电商领域抢购和秒杀是非常普遍业务模式抢购类业务在快速拉升用户流量并为消息者带来实惠的同时也给电商系统带来了巨大考验。在高并发、大流量的冲击下系统的性能和稳定性至关重要任何一个环节出现故障都会影响整体的购物体验甚至造成电商系统的大面积崩溃。和电商领域抢购场景极为类似的业务模式还有很多比如大型赛事和在线教育的报名系统以及各类购票系统等。 针对抢购类业务在技术上带来的挑战业界有一系列解决方案通过不同维度来提升系统的性能与稳定性包括动静分离、定时上架、异步处理、令牌队列、多级缓存、作弊行为侦测、流量防护、全链路压测等。 本文重点聚焦在如何确保抢购类业务的一致性上分布式事务一直是IT界老大难的问题而抢购业务所具备的高并发、大流量特征更是成倍增加了分布式一致性的实现难度。以下将介绍如何通过事务消息构建满足抢购类业务要求的高性能高可用分布式一致性机制。 事务一致性原理回顾 事务是应用程序中一系列严密的操作这一系列操作是一个不可分割的工作单位它们要么全部执行成功要么一个都不做。事务具有四个特征原子性 Atomicity 、一致性 Consistency 、隔离性 Isolation 和持续性 Durability 这四个特性简称为 ACID 特性。 在非并发状态下保证事务的ACID特性是轻而易举的事情如果某一个操作执行不成功把前面的操作全部回滚就OK了。而在并发状态下由于有多个事务同时操作同一个资源对于事务ACID特性的保证就会困难一些如果考虑得不周全就会遇到如下几个问题 脏读事务A读到了事务B还没有提交的数据。不可重复读在一个事务里面对某个数据读取了两次读出来的数据不一致。幻读 在一个事务对某个数据集用同样的方式读取了两次数据集的条目数量不一致。 为了应对上述并发情况下出现的问题就需要通过一定的事务隔离级别来解决。当事务的隔离级别越高的时候上述问题发生的机会就越小但是性能消耗也会越大。所以在实际生产过程中要根据实际需求去确定隔离级别 READ_UNCOMMITTED读未提交最低的隔离级别可以读到未提交的数据无法解决脏读、不可重复读、幻读中的任何一种。READ_COMMITED 读已提交能够防止脏读但是无法解决不可重复读和幻读的问题。REPEATABLE_READ 重复读取对同一条数据的多次重复读取能保持一致解决了脏读、不可重复读的问题但是幻读的问题还是无法解决。SERLALIZABLE 串行化最高的事务隔离级别避免了事务的并行执行解决了脏读、不可重复读和幻读的问题但性能最低。 关系型数据库提供了对于事务的支持能够通过不同隔离级别的设置确保并发状态下事务的ACID特性。但关系型数据库提供的能力是基于单机事务的一旦遇到分布式事务场景就需要通过更多其他技术手段来解决问题。 抢购业务中的分布式事务 有如下三种情况可能会产生分布式事务 一个事务操作包含对两个数据库的操作数据库所提供的事务保证仅能局限在对于自身的操作上无法跨越到其他数据库。2、一个事务包含对多个数据分片的操作具体的分片规则由分库分表中间件或者分库分表SDK来实现有可能跨越多个数据库或同一个数据库的多个表。对于业务逻辑而言底层的数据分片情况是不透明的因此也没有办法依赖于数据库提供的单机事务机制。 3、一个事务包括对多个服务的调用在微服务领域这是极为常见的场景不同的服务使用不同的数据资源甚至涉及到更为复杂的调用链路。在这种情况下数据库提供的单机事务机制仅仅能保证其中单一环节的ACID特性没有办法延伸到全局。 微服务技术在电商领域的普及程度是非常高的比较大型的电商应用还会通过中台思想将共性业务能力进行沉淀因此抢购业务中的很多环境都属于跨服务的分布式调用会涉及到上述第三种分布式事务形态。比如在订单支付成功后交易中心会调用订单中心的服务把订单状态更新并调用物流中心的服务通知商品发货同时还要调用积分中心的服务为用户增加相应的积分。如何保障分布式事务一致性成为了确保抢购业务稳定运行的核心诉求之一。 分布式事务的实现方式 传统分布式事务 传统的分布式事务通过XA模型实现通过一个事务协调者站在全局的角度将多个子事务合并成一个分布式事务。XA模型之所以能在分布式事务领域得到广泛使用是因为其具有如下两个方面的优势 提供了强一致性保证在业务执行的任何时间点都能确保事务一致性。使用简单。常见的关系型数据库都提供了对XA协议的支持通过引入事务协调器业务代码跟使用单机事务相比基本上没有差别。 但是在互联网领域XA模型的分布式事务实现存在很多局限性在抢购业务这样的高并发大流量场景中更是被完全弃用。我们拿XA分布式协议中最普遍的两阶式提交方案来说明为什么XA模型并不适合互联网场景。 性能问题。在两段式提交的执行过程中所有参与节点都是事务阻塞型的需要长时间锁定资源。这会导致系统整体的并发吞吐量变低在抢购业务中是不可接受的。单点故障问题。事务协调者在链路中有着至关重要的作用一旦协调者发生故障参与者会一直阻塞下去整个系统将无法工作因此需要投入巨大的精力来保障事务协调者的高可用性。数据不一致问题。在阶段二中如果协调者向参与者发送commit请求之后发生了网络异常会导致只有一部分参与者接收到了commit请求没有接收到commit请求的参与者最终会执行回滚操作从而造成数据不一致现象。在抢购业务中这样的数据不一致有可能会对企业或消费者造成巨大的经济损失。因此XA模型是一个理想化的分布式事务模型并没有考虑到高并发、网络故障等实际因素即便是在两阶段提交的基础上诞生了三阶段提交这样的实现方式也没有办法从根本上解决性能和数据不一致的问题。 柔性事务 针对传统分布式事务方案在互联网领域的局限性业界提出了CAP理论以及BASE理论在此基础上诞生了在大型互联网应用中广泛使用的柔性事务。柔性事务的核心思想是放弃传统分布式事务中对于严格一致性的要求允许在事务执行过程中存在数据不一致的中间状态在业务上需要容忍中间状态的存在。柔性事务会提供完善的机制保证在一段时间的中间状态后系统能走向最终一致状态。 遵循BASE理论的柔性事务放弃了隔离性减小了事务中锁的粒度使得应用能够更好的利用数据库的并发性能实现吞吐量的线性扩展。异步执行方式可以更好地适应分布式环境在网络抖动、节点故障的情况下能够尽量保障服务的可用性。因此在高并发、大流量的抢购业务中柔性事务是最佳的选择。 传统分布式事务 柔性事务 业务改造 无 有 一致性 强一致性 最终一致 回滚 支持 实现回退接口 隔离性 支持 放弃隔离性或实现资源锁定接口 并发性能 低 高 适合场景 低并发、短事务 高并发、长事务 柔性事务有多种实现方式包括TCC、Saga、事务消息、最大努力通知等本文将重点介绍通过事务消息实现柔性事务。 事务消息原理分析 抢购业务场景拆解 我们结合抢购业务的真实场景分析如何通过事务消息实现分布式一致性。在抢购业务中有两个非常关键的阶段需要引入分布式事务机制分别是订单创建阶段和付款成功阶段。 从字面含义来看抢购业务就隐含了一个重要的前提库存是有限的。因此在订单创建的时候需要预先检查库存情况并相对应的库存进行锁定以防止商品超卖。如果库存锁定操作失败代表库存不足必须确保订单不能被成功创建。在锁定库存后如果因为某种异常情况导致订单创建失败必须及时将之前锁定的库存进行释放操作以便让其他用户可以重新争夺对应的商品。 如果抢购系统实现了购物车机制在订单创建的同时则需要从购买车中将相应的条目删除。 基于微服务架构的业务拆分订单创建阶段的3个行为很有可能通过3个不同的微服务应用完成因此需要通过分布式事务来保证数据一致性。 订单创建完成后会等待用户付款一旦付款成功就会触发付款成功阶段的执行逻辑。这个阶段同样是通过分布式事务来完成包含修改订单状态、扣减库存、通知发货、增加积分这4个子事务它们要么全部不执行要么全部执行成功。 当然在真实的抢购业务中情况有可能会更加的复杂本文列出的只是其中最具代表性的几类业务行为。 引入消息异步通知机制 传统的分布式事务存在一个很大的弊端是参与节点都是事务阻塞型的需要长时间锁定资源。以锁定库存 -创建订单这个流程为例借助于Redis等缓存系统单纯锁定库存的操作只需要花费毫秒级的时间可以承载非常高的并发量。但如果把创建订单的操作也考虑进来加上不同微服务应用之间相互通讯的时候整体耗时有可能超过1秒导致性能急剧下降。 假设存在一种异步消息机制让分布式事务的第一个参与方在执行完本地事务后通过触发一笔消息通知事务的其他参与方完成后续的操作就能将大事务拆解为多个本地子事务来分开执行。在这种模式下事务的多个参与方之间之间并不需要彼此阻塞和等待就能极大程度地提升并发吞吐能力。对于库存中心而言在高并发场景下只需要不断的执行锁定库存记录操作并不断通过异步消息通知订单中心创建订单只要异步消息机制能确保消息一定送达并得到正确处理就能够实现分布式最终一致性。 先执行本地事务还是先发送异步消息 在这个模型中异步消息的发送交给了分布式事务的第一个参与方来完成这个参与方就拥有了两个职责执行本地事务和发送异步消息。到底应该先执行本地事务还是先发异步消息呢 第一种方案是先发送异步消息再执行本地事务。这样做肯定是不行的如果本地事务没有执行成功异步消息已经发出去了其他事务参与方收到消息后会执行对应的远程事务造成数据不一致。 第二种方案是先执行本地事务再发送异步消息。这样做能够解决本地事务执行失败导致的数据不一致问题因为只有在本地事务执行成功的情况下才会发送异步消息。但如果事务的参与方在执行本地事务成功后自己宕机了就再有没有机会发送异步消息了因此这样做同样会造成数据不一致的问题。记住在真实场景中任何一个应用节点都不是100%可靠的都存在宕机的可能性。 一个可行的方案是引入可以处理事务消息的消息队列集群用于异步消息的中转。一个事务消息包含两种形态首先事务的参与方发送一笔半事务消息到消息队列表示自己即将执行本地事务消息队列集群在收到这个半事务消息后不会马上进行投递而是进行暂存。在执行完本地事务后事务的参考方再发送一笔确认消息到消息队列集群告知本地事务的执行状态。如果本地事务执行成功消息队列集群会将之前收到的半事务消息进行投递如果本地事务执行失败消息队列集群直接删除之前收到的半事务消息这样远程事务就不会被执行从而保证了最终一致性。 同样如果事务参与方在执行完本地事务后宕机了怎么办呢这就需要消息队列集群具备回查机制如果收到半事务消息后在特定时间内没有再收到确认消息会反过来请求事务参与方查询本地事务的执行状态并给予反馈。这样即便错过了确认消息消息队列集群也有能力了解到本地事务的执行状态从而决定是否将消息进行投递。在一个微服务应用中会存在多个对等的应用实例这也就代表着即便一个事务参与方的实例在执行完本地事务后宕机了消息队列集群依然可以通过这个实例的兄弟实例了解到本地事务执行的最终状态。 如何确保远程事务能执行成功 如果一切本地事务的执行以及异步消息的投递都一切顺利的话接下来还会存在另外两种数据不一致的可能性 消息队列集群在将异步消息投递到远程事务参与方的时候由于网络不稳定消息没能投递成功。消息投递成功了但远程事务参与方还没来得及执行远程事务就宕机了。这两种情况都会导致远程事务执行失败所以需要建立一种消息重试机制让远程事务参与方在完成任务后实际上对远程事务参与方而言这个任务是它要执行的本地任务给予消息队列集群一个反馈告知异步消息已经得到了正确的处理。否则消息队列会在一定时间后周期性的重复投递消息直到它收到了来自远程事务参与方的反馈以确保远程事务一定能执行成功。 和事务回查机制类似远程事务参与方也有多个对等的微服务实例即便某个实例在没来得及执行远程事务的时候宕机消息队列也可以将任务交给这个实例的兄弟实例来完成。 完整流程 事务消息实战 了解到事务消息的原理后我们不难得出一个结论消息队列集群在整个流程中起着至关重要的作用如果消息队列集群不可用所有涉及到分布式事务的业务都将中止因此我们需要一个高可用的消息队列集群能够始终保持在工作状态即便其某个组件出现故障也能够在短时间内自动恢复不会影响业务还能确保接收到的消息不丢失。 消息队列RocketMQ 消息队列 RocketMQ 版是阿里云基于 Apache RocketMQ 构建的低延迟、高并发、高可用、高可靠的分布式消息中间件。该产品最初由阿里巴巴自研并捐赠给 Apache 基金会服务于阿里集团 13 年覆盖全集团所有业务包括种类金融级场景。作为双十一交易核心链路的官方指定产品支撑千万级并发、万亿级数据洪峰历年刷新全球最大的交易消息流转记录。 阿里云消息队列RocketMQ提供了对于事务消息机制最完整实现包括半事务消息、确认消息、事务回查机制、消息重试等重要功能。此外消息队列RocketMQ还提供了极强的高可用能力以及数据可靠性可以确保在各种极端场景下都能提供稳定的服务并确保消息不丢失。 对于开发者而言使用云上的消息队列RocketMQ可以免除消息队列集群的搭建和维护工作将更多的精力投入到实现业务逻辑的工作中。当消息队列集群的性能不能满足要求时还可以非常方便的进行集群一键扩容以获得更高的并发吞吐量。 开通RocketMQ服务 在阿里云官方网站开通消息队列服务后方可开始使用消息队列RocketMQ如果使用RAM用户访问RocketMQ还必须先为RAM用户进行授权。在完成阿里云账户注册以及实名认证后打开消息队列RocketMQ版产品页点击免费开通页面跳转至消息队列RocketMQ版控制台在弹出的提示对话框中完成RocketMQ服务的开通。 接下来登录RAM控制台在左侧导航栏选择人员管理 用户在用户页面单击目标RAM用户操作列的添加权限在添加权限面板单击需要授予RAM用户的权限策略单击确定。消息队列RocketMQ提供多种系统策略可以根据权限范围为RAM用户授予相关权限。为了简单起见我们先开通AliyunMQFullAccess权限策略授予该RAM用户所有消息收发权限和控制台所有功能操作权限。 创建资源 在调用SDK收发消息前需在消息队列RocketMQ控制台创建相关资源在调用SDK时需填写这些资源信息。首先我们要创建RocketMQ实例实例是用于消息队列RocketMQ服务的虚拟机资源相当于一个独立的消息队列集群会存储消息Topic和客户端Group ID信息。我们还需要注意只有在同一个地域下的同一个实例中的Topic和Group ID才能互通例如某Topic创建在华东1杭州地域的实例A中那么该Topic只能被在华东1杭州地域的实例A中创建的Group ID对应的生产端和消费端访问。 登录到消息队列RocketMQ控制台在左侧导航栏单击实例列表在顶部菜单栏选择地域如华东1杭州在实例列表页面单击创建实例在创建 RocketMQ 实例面板完成实例的创建。 接下来在实例所在页面的左侧导航栏单击Topic 管理。在Topic 管理页面单击创建 Topic在创建 Topic面板输入名称和描述选择该Topic的消息类型为事务消息完成Topic的创建。 Topic是消息队列RocketMQ版里对消息的一级归类例如创建Topic_Trade这一Topic来识别交易类消息消息生产者将消息发送到Topic_Trade而消息消费者则通过订阅该Topic来获取和消费消息。 创建完实例和Topic后需要为消息的消费者和或生产者创建客户端ID即Group ID作为标识。在事务消息的场景中需要创建2个不同的Group ID分别代表本地事务参与方和远程事务参与方。在实例所在页面的左侧导航栏单击Group 管理在Group 管理页面选择TCP 协议 创建 Group ID在创建可用于 TCP 协议的 Group面板完成本地事务客户端Group ID的创建。重复此操作完成远程事务参与方Group ID的创建。 本地事务参与方的业务代码 本文将通过Java代码介绍如何实现事务消息相关的业务逻辑为了简化业务逻辑我们继续基于锁定库存 -  创建订单这个流程来演示在这个流程中仅有2个事务参与方。 初始化TransactionProducer 我们先通过Maven引入消息队列RocketMQ的SDK优先使用阿里云官方提供的TCP版SDK。 dependencygroupIdcom.aliyun.openservices/groupIdartifactIdons-client/artifactIdversion1.8.7.2.Final/version /dependency 顺利引入Log4j2用于日志输出。 dependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion1.7.7/version /dependency dependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-slf4j-impl/artifactIdversion2.13.1/version /dependency 在库存中心的代码中我们需要初始化一个TransactionProducer用于异步消息的发送需要填入如下信息 Group ID之前创建的用于本地事务参与方的Group ID。Access key和Secret KeyRAM用户对应的密钥信息从RAM用户控制台获得。Nameserver AddressRocketMQ实例的接入点信息从RocketMQ控制台获得。 Properties properties new Properties(); // 您在控制台创建的Group ID。注意事务消息的Group ID不能与其他类型消息的Group ID共用。 properties.put(PropertyKeyConst.GROUP_ID, XXX); // AccessKey ID阿里云身份验证在阿里云RAM控制台创建。 properties.put(PropertyKeyConst.AccessKey, XXX); // AccessKey Secret阿里云身份验证在阿里云RAM控制台创建。 properties.put(PropertyKeyConst.SecretKey, XXX); // 设置TCP接入域名进入消息队列RocketMQ版控制台的实例详情页面的TCP协议客户端接入点区域查看。 properties.put(PropertyKeyConst.NAMESRV_ADDR, XXX); // LocalTransactionCheckerImpl本地事务回查类的实现 TransactionProducer producer ONSFactory.createTransactionProducer(properties,new LocalTransactionCheckerImpl()); producer.start(); TransactionProducer是线程安全的启动后能在多线程环境中复用。 获取全局唯一的交易流水号 在发送半事务消息以及执行本地事务之前我们需要先获取一个全局唯一的交易流水号订单与交易流水号一一对应接下来的事务消息机制都会依赖于这个这个交易流水号。我们可以通过引入第三方ID生成组件或者在本地通过Snowflake算法实现。 实现本地事务回查逻辑 创建一个实现了LocalTransactionChecker接口的 LocalTransactionCheckerImpl类实现其中的check(Message)方法该方法返回本地事务的最终状态。至于具体的业务逻辑如何实现不在本文讨论的范围之前我们将其封装在BusinessService类中。 package transaction;import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker; import com.aliyun.openservices.ons.api.transaction.TransactionStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class LocalTransactionCheckerImpl implements LocalTransactionChecker {private static Logger LOGGER LoggerFactory.getLogger(LocalTransactionCheckerImpl.class);private static BusinessService businessService new BusinessService();Overridepublic TransactionStatus check(Message msg) {// 从消息体中获得的交易IDString transactionKey msg.getKey();TransactionStatus transactionStatus TransactionStatus.Unknow;try {boolean isCommit businessService.checkbusinessService(transactionKey);if (isCommit) {transactionStatus TransactionStatus.CommitTransaction;} else {transactionStatus TransactionStatus.RollbackTransaction;}} catch (Exception e) {LOGGER.error(Transaction Key:{}, transactionKey, e);}LOGGER.warn(Transaction Key:{}transactionStatus:{}, transactionKey, transactionStatus.name());return transactionStatus;} }执行本地事务并发送异步消息 我们先组装一条异步消息其中包含了全局交易ID消息将要发往的Topic以及消息体。远程事务参与方将通过这个消息体中获取执行远程事务所必须的数据信息。 接下来将这条异步消息连同一个实现了LocalTransactionExecuter接口的匿名类通过send方法进行发送这就是本地事务参与方所需要实现的所有业务代码了。当然这个匿名类实现了TransactionStatus execute.execute()方法其中包含了对于本地事务的执行。完整代码如下 package transaction;import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.SendResult; import com.aliyun.openservices.ons.api.transaction.TransactionProducer; import com.aliyun.openservices.ons.api.transaction.TransactionStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.util.Properties; import java.util.concurrent.TimeUnit;public class TransactionProducerClient {private static Logger LOGGER LoggerFactory.getLogger(TransactionProducerClient.class);private static final BusinessService businessService new BusinessService();private static final String TOPIC create_order;private static final TransactionProducer producer null;static {Properties properties new Properties();// 您在控制台创建的Group ID。注意事务消息的Group ID不能与其他类型消息的Group ID共用。properties.put(PropertyKeyConst.GROUP_ID, XXX);// AccessKey ID阿里云身份验证在阿里云RAM控制台创建。properties.put(PropertyKeyConst.AccessKey, XXX);// AccessKey Secret阿里云身份验证在阿里云RAM控制台创建。properties.put(PropertyKeyConst.SecretKey, XXX);// 设置TCP接入域名进入消息队列RocketMQ版控制台的实例详情页面的TCP协议客户端接入点区域查看。properties.put(PropertyKeyConst.NAMESRV_ADDR, XXX);// LocalTransactionCheckerImpl本地事务回查类的实现TransactionProducer producer ONSFactory.createTransactionProducer(properties,new LocalTransactionCheckerImpl());producer.start();}public static void main(String[] args) throws InterruptedException {String transactionKey getGlobalTransactionKey();String messageContent String.format(lock inventory for: %s, transactionKey);Message message new Message(TOPIC, null, transactionKey, messageContent.getBytes());try {SendResult sendResult producer.send(message, (msg, arg) - {// 此处用Lambda表示实际是实现TransactionStatus execute(final Message msg, final Object arg)方法TransactionStatus transactionStatus TransactionStatus.Unknow;try {boolean localTransactionOK businessService.execbusinessService(transactionKey);if (localTransactionOK) {transactionStatus TransactionStatus.CommitTransaction;} else {transactionStatus TransactionStatus.RollbackTransaction;}} catch (Exception e) {LOGGER.error(Transaction Key:{}, transactionKey, e);}LOGGER.warn(Transaction Key:{}, transactionKey);return transactionStatus;}, null);LOGGER.info(send message OK, Transaction Key:{}, result:{}, message.getKey(), sendResult);} catch (Exception e) {LOGGER.info(send message failed, Transaction Key:{}, message.getKey());}// demo example防止进程退出TimeUnit.MILLISECONDS.sleep(Integer.MAX_VALUE);}private static String getGlobalTransactionKey() {// TODOreturn ;} } 得益于RocketMQ SDK优秀的封装发送半事务消息、发送确认消息、事务回查等重要步骤都已经完整实现不需要开发者再编写代码了这将为用户带来特别顺畅开发体验。 远程事务参与方的业务代码 相对本地事务参与方而言远程事务参与方的代码更加简单只需要从异步消息中提取出对应信息完成对远程事务的执行即可。 package transaction;import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.Consumer; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.util.Properties;public class TransactionConsumerClient {private static Logger LOGGER LoggerFactory.getLogger(TransactionProducerClient.class);private static final BusinessService businessService new BusinessService();private static final String TOPIC create_order;private static final Consumer consumer null;static {Properties properties new Properties();// 在控制台创建的Group ID不同于本地事务参与方使用的Group IDproperties.put(PropertyKeyConst.GROUP_ID, XXX);// AccessKey ID阿里云身份验证在阿里云RAM控制台创建。properties.put(PropertyKeyConst.AccessKey, XXX);// Accesskey Secret阿里云身份验证在阿里云服RAM控制台创建。properties.put(PropertyKeyConst.SecretKey, XXX);// 设置TCP接入域名进入控制台的实例详情页面的TCP协议客户端接入点区域查看。properties.put(PropertyKeyConst.NAMESRV_ADDR, XXX);Consumer consumer ONSFactory.createConsumer(properties);consumer.start();}public static void main(String[] args) {consumer.subscribe(TOPIC, *, (message, context) - {LOGGER.info(Receive: message);businessService.doBusiness(message);// 返回CommitMessage代表给予消息队列集群异步消息已经得到正常处理的回馈return Action.CommitMessage;});} } 事务回滚 是否存在这样的情况当本地事务执行成功后因为远程事务没有办法执行而导致本地事务需要进行回滚操作呢在事务消息原理分析一节我们已经介绍过如何通过消息重试确保远程事务能够执行成功这是不是已经说明只要异步消息被确认远程事务就一定可以执行成功从而不存在对本地事务的回滚呢 实际生产情况下确实存在远程事务无法正常执行的情况。比如在付款成功阶段当本地事务“修改订单状态”执行完成后在执行远程事务“通知发货”的时因为订单地址有误而被物流公司拒绝这种情况下就必须对订单状态进行回退操作并发起退款流程。 所以在执行远程事务的时候我们有必要区分如下两种完全不同的异常 技术异常远程事务参与方宕机、网络故障、数据库故障等。业务异常远程逻辑在业务上无法执行、代码业务逻辑错误等。简单来讲当远程事务执行失败的时候能够通过消息重试的方式解决问题的属于技术异常否则属于业务异常。基于事务消息的分布式事务机制不能实现自动回滚当业务异常发生的时候必须通过回退流程确保已经完成的本地事务得到恢复。比如在修改订单状态 - 通知发货这个场景中如果由于业务异常导致无法发货的时候需要通过额外的回退流程将订单状态设置为“已取消”并执行退款流程。 在事务消息机制中回退流程相当于远程事务参与方和本地事务参与方调换了角色和正常流程一样同样也可以通过事务消息来完成分布式事务。由于正常流程和回退流程的业务逻辑是完全不一样的所以最理想的方式是建立另外一个Topic来实现。这也就说明我们在创建事务消息Topic的时候要充分考虑到这个Topic背后的业务含义并在Topic命名上尽可能的与真实业务相匹配。 多个事务参与方 本节展示的示例中都只涉及到2个事务参与方但在真实世界中分布式事务往往涉及到更多的事务参与方比如之前提到的付款成功环节有修改订单状态-扣减库存-通知发货-增加积分这4个需要同时进行的操作涉及到4个事务参与方。这种情况下如何通过事务消息来实现分布式事务呢 我们依然可以继续使用之前的架构只需要加入多个远程事务参与方就行了。可以通过RocketMQ的多消费组关联多个远程事务参与方每一个参与方对应一个Group ID在这种情况下同一个异步消息会复制成多份投递给不同的事务参与方。 需要特别引起注意的是当某个远程事务参与方遇到业务异常的时候需要通知其他所有事务参与方执行回退流程这无疑会增加业务逻辑的整体复杂度。为了简化事务消息的执行流程我们可以对业务逻辑预先进行梳理将子事务分为如下两类 有可能发生业务异常的比如锁定库存的操作有可能因为库存不足而执行失败。又比如扣除积分的操作有可能因为用户积分不足而无法扣除。不太可能发生业务异常的比如删除购物车条目的操作除非是技术类故障一定可以执行成功即便对应的条目并不存在也没有关系。又比如积分增加的操作只要对应的用户没有注销是不可能遇到业务异常的。我们尽量将第一类事务作为本地事务而实现将第二类事务作为远程事务而实现这样就可以最大程度避免回退流程。 其他注意事项 消息幂等 RocketMQ能保证消息不丢失但不能保证消息不重复所以消费者在接收到消息后有必要根据业务上的唯一Key对消息做幂等处理。在抢购业务中唯一Key当然就是全局唯一的交易流水号具体幂等处理方法在互联网上有很多文章供读者参考。当然不是每一种业务远程事务都需要确保消息的幂等性比如删除购物车指定条目这样的操作在业务上是可以容忍多次反复执行的就没有必要引入额外的幂等处理了。 每日对账 不同于传统事务的强一致性保证柔性事务需要经历一个中间状态才到达成事务的最终一致性。有某些特殊情况下这个中间状态会持续非常长的时间甚至需要人工主动介入才能实现最终一致性 消息重试多次后依然不成功当消费者完全无法正常工作的时候RocketMQ不可能永无止境地重试消息事实上如果16次重试后异步消息依然没有办法被正常处理RocketMQ会停止尝试将消息放到一个特殊的队列中。未处理的业务异常比如给某个账号加积分的时候发现此账号被注销了这是一个非常罕见的业务现象有可能事先对此并没有健壮的处理机制。幂等校验失败处理幂等所依赖的系统比如Redis发生了故障导致某些消息被重复处理。其他严重的系统故障比如网络长时间中断留下了大量执行到一半的事务。其他漏网之鱼。 这些情况下我们都有需要通过定期对账机制来进行排查在必要的时候发起人工主动介入流程修复不一致的数据。事实上在任何柔性事务的实现中每日对账都是必不可少的数据安全保障性手段。 总结 在柔性事务的多种实现中事务消息是最为优雅易用的一种。基于阿里云RocketMQ高性能、高可用的特点完全可以胜任抢购业务这类高并发大流量的场景。在阿里巴巴自身的业务中事务消息也广泛使用于双11这样的大型营销活动中有着非常高的通用性。 但在IT领域没有任何一种技术是银弹引入事务消息机制需要针对性的修改业务逻辑还需要借助于每日对账等额外的手段确保数据安全在实现高性能的同时也增加了整体的业务复杂度。我们需要对业务场景进行充分评估对比多种不同的技术实现方案从中挑选与自身业务特点最为匹配的一种才能更好地发挥柔性事务的价值。 作者山猎阿里云解决方案架构师 原文链接 本文为阿里云原创内容未经允许不得转载
http://www.zqtcl.cn/news/470558/

相关文章:

  • 学院网站板块盘多多搜索引擎入口
  • 网站seo内部优化wordpress建站网站报错
  • 网站建设科技国外网站入口
  • 怎样用网站做淘宝推广免费的项目管理软件
  • 共青城网站建设微网站开发报价
  • 网站建设选超速云建站网站建设公司比较
  • 芜湖网络科技有限公司沈阳网站推广优化公司哪家好
  • 自己制作图片文字图片网站建设和优化内容最重要性
  • 邢台做网站优化建筑行业新闻资讯
  • 站长统计app最新版本2023网站标题是关键词吗
  • 中山精品网站建设市场wordpress登陆phpadmin
  • 泸县手机网站建设佛山城市建设工程有限公司
  • 长沙网站推广排名优化wordpress主题字体更改
  • 深圳网站建设软件定制公司房地产开发公司注册资金要求
  • 个人如何在企业网站做实名认证房地产平面设计主要做什么
  • 网站做字工具WordPress搜索功能增强
  • 慢慢来做网站多少钱wordpress优化搜索引擎
  • 网页 网站 区别现在装宽带要多少钱
  • 黄金网站下载免费建设个人网站需要什么条件
  • 网站开发人员岗位职责网站维护报价单
  • 免费正能量不良网站推荐自建网站视频教程
  • 厦门物流网站建设南京宜电的网站谁做的
  • vps 网站备案手机界面设计素材
  • seo排名影响因素主要有灯塔seo
  • 济南哪家做网站小勇cms网站管理系统
  • sns社交网站注册做网站 提交源码 论坛
  • wordpress网站编辑semir是什么牌子
  • 做区块链的网站教育培训机构平台
  • 系统网站怎么做的seo竞争对手分析
  • 菏泽网站建设菏泽众皓网页开发工资