上海网站营销是什么,cms官方网站,如何做网站ip跳转,网站关键字统计前面对于分布式事务也讲了好几篇了#xff08;可靠消息最终一致性分布式事务 - TCC分布式事务 - 2PC、3PChttps://github.com/kklldog/AgileDT 开源不易#xff0c;大家多多 ✨✨✨回顾 前面一篇文章(可靠消息最终一致性 )我们详细介绍了基于可靠消息的分布式事务。为了更好的… 前面对于分布式事务也讲了好几篇了可靠消息最终一致性分布式事务 - TCC分布式事务 - 2PC、3PChttps://github.com/kklldog/AgileDT 开源不易大家多多 ✨✨✨回顾 前面一篇文章(可靠消息最终一致性 )我们详细介绍了基于可靠消息的分布式事务。为了更好的理解 AgileDT 的代码我们还是有必要简单的来回顾下。该方案总体流程上可分为以下步骤主动方在真正的业务开始前先向可靠消息服务发送一个“待确认”的消息可靠消息服务收到待确认消息后持久化消息到数据库如果以上操作成功则主动方开始真正的业务如果失败则直接放弃执行业务如果业务执行成功则发送“确认”消息给可靠消息服务如果执行失败则发送“取消”给可靠消息服务。如果可靠消息服务收到“确认”消息则更新数据库里的消息记录的状态为“待发送”如果收到的消息为“取消”则更新消息状态为“已取消”如果上一步更新的数据库为“待发送”那么会开始往MQ投递消息并且更改数据库里的消息记录的状态为“已发送”上一步往MQ投递消息成功后MQ会给被动方推送消息。被动方收到消息后开始处理业务如果业务处理成功则被动方对MQ进行ACK回复则这条消息会从MQ内移除掉如果业务处理成功则发送“已完成”消息给可靠消息服务可靠消息服务收到“已完成”消息后更新数据库消息记录未“已完成”废话不多说了下面让我们演示下如何使用 AgileDT 来快速实现一个基于可靠消息的分布式事务。以下我们还是以经典的订单下单完成给会员赠送积分的场景来演示。使用 AgileDT依赖组件 mysqlrabbitmq目前支持 mysql 数据库但是数据访问组件使用的是 freesql 所以后续要实现支持别的数据库也很简单。目前框架使用的可靠消息服务为 rabbitmq 。运行服务端 在服务新建一个数据库并且新建一张表// crate event_message table on mysql
create table if not exists event_message
(event_id varchar(36) not nullprimary key,biz_msg varchar(4000) null,status enum(Prepare, Done, WaitSend, Sent, Finish, Cancel) not null,create_time datetime(3) null,event_name varchar(255) null
);使用docker-compose运行服务端version: 3 # optional since v1.27.0
services:agile_dt:image: kklldog/agile_dtports:- 5000:5000environment:- db:providermysql- db:conn Databaseagile_dt;Data Source192.168.0.115;User Idroot;Passwordmdsd;port3306- mq:userNameadmin- mq:password123456- mq:host192.168.0.115- mq:port5672安装客户端 在主动方跟被动方都需要安装AgileDT的客户端库Install-Package AgileDT.Client主动方使用方法 在业务数据库添加事务消息表// crate event_message table on mysql
create table if not exists event_message
(event_id varchar(36) not nullprimary key,biz_msg varchar(4000) null,status enum(Prepare, Done, WaitSend, Sent, Finish, Cancel) not null,create_time datetime(3) null,event_name varchar(255) null
);修改配置文件在appsettings.json文件添加以下节点agiledt: {server: http://localhost:5000,db: {provider: mysql,conn: Databaseagile_order;Data Source192.168.0.125;User Iddev;Passworddev123f;port13306//conn: Databaseagile_order;Data Source192.168.0.115;User Idroot;Passwordmdsd;port3306},mq: {host: 192.168.0.125,//host: 192.168.0.115,userName: admin,password: 123456,port: 5672}}注入 AgileDT 客户端服务public void ConfigureServices(IServiceCollection services){services.AddAgileDT();...}实现IEventService方法处理主动方业务逻辑的类需要实现IEventService接口并且标记那个方法是真正的业务方法。AgileDT在启动的时候会扫描这些类型并且使用AOP技术生成代理类在业务方法前后插入对应的逻辑来跟可靠消息服务通讯。这里要注意的几个地方实现IEventService接口使用DtEventBizMethod注解标记业务入口方法使用DtEventName注解来标记事务的方法名称如果不标记则使用类名注意业务方法最终一定要使用事务来同步修改消息表的status字段为done状态这个操作框架没办法帮你实现注意业务方法如果失败请抛出Exception如果不抛异常框架一律认为执行成功public interface IAddOrderService:IEventService{bool AddOrder(Order order);}[DtEventName(orderservice.order_added)]public class AddOrderService : IAddOrderService{private readonly ILoggerAddOrderService _logger;public AddOrderService(ILoggerAddOrderService logger){_logger logger;}public string EventId { get;set;}[DtEventBizMethod]public virtual bool AddOrder(Order order){var ret false;//3. 写 Order 跟 修改 event 的状态必选写在同一个事务内FreeSQL.Instance.Ado.Transaction(() {order.EventId EventId;//在订单表新增一个eventid字段使order跟event_message表关联起来var ret0 FreeSQL.Instance.Insert(order).ExecuteAffrows();var ret1 FreeSQL.Instance.UpdateOrderService.Data.entities.EventMessage().Set(x x.Status, MessageStatus.Done).Where(x x.EventId EventId).ExecuteAffrows();ret ret0 0 ret1 0;});return ret;}/// summary/// 构造后续业务处理需要的消息内容/// /summary/// returns/returnspublic string GetBizMsg(){//这里可以构造传递到MQ的业务消息的内容比如传递订单编号啊 ,以便后续的被动方处理业务时候使用var order FreeSQL.Instance.SelectOrder().Where(x x.EventId EventId).First();return order?.Id;}}在实现好 IAddOrderService 接口后你可以像平常一样使用 IAddOrderService 来注入实现类。比如在 Controller 的构造函数注入进去。因为 AgileDT 在启动的时候会自动帮你注册。注意IAddOrderService 跟实现类的生命周期是 Scoped 。被动方使用方法 在业务方数据库建表或者在业务表上加字段对于被动方来说这里不是必须要建一个表。但是至少要有个地方来存储event_id的信息最简单的是直接在业务主表上加event_id字段。修改配置文件在appsettings.json文件添加以下节点agiledt: {server: http://localhost:5000,db: {provider: mysql,conn: Databaseagile_order;Data Source192.168.0.125;User Iddev;Passworddev123f;port13306//conn: Databaseagile_order;Data Source192.168.0.115;User Idroot;Passwordmdsd;port3306},mq: {host: 192.168.0.125,//host: 192.168.0.115,userName: admin,password: 123456,port: 5672}}注入AgileDT服务public void ConfigureServices(IServiceCollection services){services.AddAgileDT();...}实现IEventMessageHandler接口被动方需要接收MQ投递过来的消息这些处理类需要实现IEventMessageHandler接口。AgileDT启动的时候会去扫描这些类然后跟MQ建立绑定关系。这里必须使用DtEventName注解标记需要处理的事件名称Reveive 方法必须是幂等的public interface IOrderAddedMessageHandler: IEventMessageHandler{}[DtEventName(orderservice.order_added)]public class OrderAddedMessageHandler: IOrderAddedMessageHandler{static object _lock new object();public bool Receive(EventMessage message){var bizMsg message.BizMsg;var eventId message.EventId;string orderId bizMsg;lock (_lock){var entity FreeSQL.Instance.SelectPointHistory().Where(x x.EventId eventId).First();if (entity null){var ret FreeSQL.Instance.Insert(new PointHistory{Id Guid.NewGuid().ToString(),EventId message.EventId,OrderId orderId,Points 20,CreateTime DateTime.Now}).ExecuteAffrows();return ret 0;}else{return true;}}}}总结 通过以上演示我们快速的实现了一个订单下单会员赠送积分的服务。可以看到使用 AgileDT 可以很快速的实现一个分布式事务。特别是在实现过一个分布式事务后后面实现起来就特别简单只要实现几个接口就可以了。AgileDT 才刚刚起步希望大家多多支持多多✨✨✨ 多多 PR 。https://github.com/kklldog/AgileDT.Net Core with 微服务 - 可靠消息最终一致性分布式事务.Net Core with 微服务 - 分布式事务 - TCC.Net Core with 微服务 - 分布式事务 - 2PC、3PC