手表网站制作照片,网站建站免费,百度通用网址,广州做外贸网站公司消息队列#xff08;Message Queue#xff09;是分布式系统必不可少的中间件#xff0c;大部分消息队列产品#xff08;如RocketMQ/RabbitMQ/Kafka等#xff09;要求团队有比较强的技术实力#xff0c;不适用于中小团队#xff0c;并且对.NET技术的支持力度不够。而Redi… 消息队列Message Queue是分布式系统必不可少的中间件大部分消息队列产品如RocketMQ/RabbitMQ/Kafka等要求团队有比较强的技术实力不适用于中小团队并且对.NET技术的支持力度不够。而Redis实现的轻量级消息队列很简单仅有Redis常规操作几乎不需要开发团队掌握额外的知识随着强大的.NET5发布.NET技术栈里面怎可没有最佳的消息队列搭档本文从高性能Redis组件 NewLife.Redis 出发借用快递业务场景讲解.NET中如何使用Redis作为消息队列搭建企业级分布式系统架构什么是消息队列消息队列就是消息在传输过程中保存消息的容器其核心功用是削峰和解耦早高峰快递公司的货车前来各驿站卸货多名站点工作人员使用PDA扫描到站大量信息进入系统1000tps而通知快递公司的接口只有400tps的处理能力。通过增加MQ来保存消息让超过系统处理能力的消息滞留下来等早高峰过后系统即可完成处理。此为削峰在快递柜业务流程中快递员投柜后需要经历扣减系统费、短信通知用户和推送通知快递公司三个业务动作。传统做法需要依次执行这些业务东西如果其中某一步异常例如用户手机未开机或者快递公司接口故障将会延迟甚至中断整个投柜流程严重影响用户体验。如果接口层收到投柜数据后写入消息到MQ后续三个子系统各自消费处理将可以完美解决该问题并且子系统故障不影响上游系统此为解耦内存消息队列最简单的消息队列可以由阻塞集合BlockingCollection实现public static void Start()
{var queue new BlockingCollectionArea();// 独立线程消费var thread new Thread(s Consume(queue));thread.Start();// 发布消息Public(queue);
}
private static void Public(BlockingCollectionArea queue)
{var area new Area { Code 110000, Name 北京市 };XTrace.WriteLine(Public {0} {1}, area.Code, area.Name);queue.Add(area);Thread.Sleep(1000);area new Area { Code 310000, Name 上海市 };XTrace.WriteLine(Public {0} {1}, area.Code, area.Name);queue.Add(area);Thread.Sleep(1000);area new Area { Code 440100, Name 广州市 };XTrace.WriteLine(Public {0} {1}, area.Code, area.Name);queue.Add(area);Thread.Sleep(1000);
}
private static void Consume(BlockingCollectionArea queue)
{while (true){var msg queue.Take();if (msg ! null){XTrace.WriteLine(Consume {0} {1}, msg.Code, msg.Name);}}
}每秒钟生产一个消息都被独立线程消费到。Redis做消息队列Redis的LIST结构具备左进右出的功能再使用BRPOP的阻塞弹出即可完成一个最基本的消息队列 RedisQueueT。GetQueue取得队列后Add方法发布消息。TakeOne拉取消费一条消息指定10秒阻塞10秒内有消息立马返回否则等到10秒超时后返回空。public static void Start(FullRedis redis)
{var topic EasyQueue;// 独立线程消费var thread new Thread(s Consume(redis, topic));thread.Start();// 发布消息Public(redis, topic);
}
private static void Public(FullRedis redis, String topic)
{var queue redis.GetQueueArea(topic);queue.Add(new Area { Code 110000, Name 北京市 });Thread.Sleep(1000);queue.Add(new Area { Code 310000, Name 上海市 });Thread.Sleep(1000);queue.Add(new Area { Code 440100, Name 广州市 });Thread.Sleep(1000);
}
private static void Consume(FullRedis redis, String topic)
{var queue redis.GetQueueArea(topic);while (true){var msg queue.TakeOne(10);if (msg ! null){XTrace.WriteLine(Consume {0} {1}, msg.Code, msg.Name);}}
}LPUSH 生产消息插入列表BRPOP 消费消息弹出列表因此消息被消费后就消失了从日志时间可以看到生产与消费的时间差在1~3ms之间延迟极低注释消费代码后重跑可以在Redis中看到发布的消息需要确认的队列如果通知快递公司的物流推送子系统处理消息时出错消息丢失怎么办显然不可能让上游再发一次这里我们需要支持消费确认的可信队列 RedisReliableQueueT。消费之后除非程序主动确认消费否则Redis不许删除消息。GetReliableQueue获取队列实例后Add发布消息TakeOneAsync异步消费一条消息并指定10秒阻塞超时处理完成后再通过Acknowledge确认。public static void Start(FullRedis redis)
{var topic AckQueue;// 独立线程消费var source new CancellationTokenSource();Task.Run(() ConsumeAsync(redis, topic, source.Token));// 发布消息Public(redis, topic);source.Cancel();
}
private static void Public(FullRedis redis, String topic)
{var queue redis.GetReliableQueueArea(topic);queue.Add(new Area { Code 110000, Name 北京市 });Thread.Sleep(1000);queue.Add(new Area { Code 310000, Name 上海市 });Thread.Sleep(1000);queue.Add(new Area { Code 440100, Name 广州市 });Thread.Sleep(1000);
}
private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token)
{var queue redis.GetReliableQueueString(topic);while (!token.IsCancellationRequested){var mqMsg await queue.TakeOneAsync(10);if (mqMsg ! null){var msg mqMsg.ToJsonEntityArea();XTrace.WriteLine(Consume {0} {1}, msg.Code, msg.Name);queue.Acknowledge(mqMsg);}}
}LPUSH 生产消息插入列表BRPOPLPUSH 消费消息弹出列表并插入另一个Ack列表这是确保不丢消息的关键。LREM 从Ack列表删除用于消费完成后确认。如果消费异常就不会执行该确认操作滞留在Ack列表的消息60秒后重新回来主列表。脑筋急转弯如果应用进程异常退出未确认的消息该怎么处理 注释消费代码后重跑可以在Redis中看到发布的消息跟普通队列一样使用了LIST结构处理“北京市”消息时如果没有Acknowledge确认Redis里面将会看到一个名为AckQueue:Ack:*的LIST结构里面保存这这一条消息。所以可信队列本质上就是在消费时同步把消息备份到另一个LIST里面确认操作就是从待确认LIST里面删除。自从有了这个可信队列基本上足够满足90%以上业务需求。延迟队列某一天小马哥说快递员投柜一定时间时候如果用户没有来取件那么系统需要收取超期取件费需要一个延迟队列。于是想到了Redis的ZSET我们再来一个 RedisDelayQueueTAdd生产消息时多了一个参数指定若干秒后可以消费到该消息消费用法跟可信队列一样。public static void Start(FullRedis redis)
{var topic DelayQueue;// 独立线程消费var source new CancellationTokenSource();Task.Run(() ConsumeAsync(redis, topic, source.Token));// 发布消息Public(redis, topic);source.Cancel();
}
private static void Public(FullRedis redis, String topic)
{var queue redis.GetDelayQueueArea(topic);queue.Add(new Area { Code 110000, Name 北京市 }, 2);Thread.Sleep(1000);queue.Add(new Area { Code 310000, Name 上海市 }, 2);Thread.Sleep(1000);queue.Add(new Area { Code 440100, Name 广州市 }, 2);Thread.Sleep(1000);
}
private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token)
{var queue redis.GetDelayQueueString(topic);while (!token.IsCancellationRequested){var mqMsg await queue.TakeOneAsync(10);if (mqMsg ! null){var msg mqMsg.ToJsonEntityArea();XTrace.WriteLine(Consume {0} {1}, msg.Code, msg.Name);queue.Acknowledge(mqMsg);}}
}上图可以看到每秒生产一个消息2秒后消费到北京市再过1秒消费到上海市距离上海市的发布刚好2秒。这里少了广州市因为测试程序在生产广州市后只等了1秒就退出。我们从Redis中可以看到广州市这一条消息存放在ZSET结构中。多消费组可重复消费的队列又一天数据中台的小伙伴想要消费订单队列但是不能够啊LIST结构做的队列每个消息只能被消费一次如果数据中台的系统消费掉了其它业务系统就会失去消息。我们想到了Redis5.0开始新增的STREAM结构再次封装RedisStream。public static void Start(FullRedis redis)
{var topic FullQueue;var queue redis.GetStreamString(topic);// 独立线程消费var source new CancellationTokenSource();Task.Run(() ConsumeAsync(redis, topic, source.Token));// 发布消息Public(redis, topic);//source.Cancel();
}
private static void Public(FullRedis redis, String topic)
{var queue redis.GetStreamArea(topic);queue.Add(new Area { Code 110000, Name 北京市 });Thread.Sleep(1000);queue.Add(new Area { Code 310000, Name 上海市 });Thread.Sleep(1000);queue.Add(new Area { Code 440100, Name 广州市 });Thread.Sleep(1000);
}
private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token)
{var queue redis.GetStreamString(topic);queue.Group test;queue.GroupCreate(queue.Group);while (!token.IsCancellationRequested){try{var mqMsg await queue.TakeMessageAsync(10);if (mqMsg ! null){var msg mqMsg.GetBodyArea();XTrace.WriteLine(Consume {0} {1}, msg.Code, msg.Name);queue.Acknowledge(mqMsg.Id);}}catch (Exception ex){XTrace.WriteException(ex);}}
}生产过程不变消费大循环有点特别主要是STREAM消费回来的消息有它自己的Id只需要对这个Id确认就可以了。上图中红色框是生产紫色框是消费。再来看看Redis中可以看到STREAM消息还在里面。数据中台组只需要使用不同的消费组Group即可独立消费不用担心抢其它系统消息啦。最佳实践RedisQueue在中通大数据分析中用于缓冲等待写入Oracle/MySql的数据多线程计算后写入队列然后由专门线程定时拉取一批500行执行批量Insert/Update操作。该系统队列每天10亿条消息Redis内存分配8G实际使用小于100M除非消费端故障导致产生积压。递易智能科技全部使用可信队列 RedisReliableQueue约200多个队列按系统分布在各自的Redis实例公有云2G内存主从版。积压消息小于10万时队列专用的Redis实例内存占用小于100M几乎不占内存空间。公司业务每天带来100万多订单由此衍生的消息数约1000万条从未丢失消息例程代码代码https://github.com/NewLifeX/NewLife.Redis/tree/master/QueueDemo