交易类网站做支付宝功能,安装wordpress 000,公司网站建设详细方案,孝感房地产网站建设这是年中首发在博客园上的文章#xff0c;个人觉得是AspNetCore结合Redis做的一次比较优秀的消息队列重构#xff0c;其中对于点对点/发布-订阅的思路应该也是面试必考题。引言.Net TPL Dataflow是一个进程内数据流管道#xff0c;应对高并发、低延迟的要求非常有效#xf… 这是年中首发在博客园上的文章个人觉得是AspNetCore结合Redis做的一次比较优秀的消息队列重构其中对于点对点/发布-订阅的思路应该也是面试必考题。引言 .Net TPL Dataflow是一个进程内数据流管道应对高并发、低延迟的要求非常有效 但在实际Docker部署的过程中 有一个问题一直无法回避单体程序部署的瞬间服务不可用会有少量流量无法处理更糟糕的情况下迭代部署的这个版本有问题上线后无法工作 导致更多流量没有处理。 背负神圣使命巨大压力的程序猿心生一计为何不将单体程序改成分布式增加服务ReceiverAppReceiverApp只接受数据WebApp只处理数据。知识储备 消息队列和订阅发布作为老生常谈的两个知识点被反复提及按照JMS的规范 官方称为点对点(point to point queue)和发布/订阅publish/subscribetopic点对点 生产者发送消息到Message Queue中然后消费者从队列中取出消息并消费。队列会保留消息直到他们被消费或超时 ① MQ支持多消费者但每个消息只能被一个消费者处理② 发送者和消费者在时间上没有依赖性当发送者发送消息之后不管消费者有没有在运行甚至不管有没有消费者都不会影响到消息被发送到队列③ 一般消费者在消费之后需要向队列应答成功如果你希望发送的每个消息都应该被成功处理你应该使用p2p模型发布/订阅 消息生产者将消息发布到Channel在此之前已有多个消费者订阅该通道。和点对点方式不同发布到特定通道的消息会被通道订阅者收到。通道没有暂存队列机制发布的消息只能被当前收听的订阅者接收到① 每个消息可以有多个订阅者② 发布者和消费者有时间上依赖性某通道的订阅者必须先创建该通道订阅才能收到消息发布消息至通道不关注订阅者是谁订阅者可收听自己感兴趣的多个通道类似于topic也不关注发布者是谁。③ 故如果没有订阅者发布的消息将得不到处理头脑风暴Redis内置的List数据结构能形成轻量级消息队列的效果Redis原生支持发布/订阅 模型如上分析 Pub/Sub模型在订阅者宕机的时候发布的消息得不到处理故此模型不能用于强业务的数据接收和处理。本次采用的消息队列模型解耦业务新建ReceiverApp作为生产者专注于接收并发送到队列原有的WebApp作为消费者专注数据处理。起到削峰填谷的作用若缩放出多个WebApp消费者容器还能形成负载均衡的效果。 需要关注Redis操作List结构的两个命令( 左进右出右进左出同理) LPUSH RPOP/BRPOPBrpop中的B 表示Block,是一个rpop命令的阻塞版本若指定List没有新元素在给定时间内该命令会阻塞当前redis客户端连接直到超时返回nilAspNetCore编程实践本次使用AspNetCore 完成RedisMQ的实践引入Redis国产第三方开源库CSRedisCore生产者ReceiverApp生产者使用LPush命令向Redis List数据结构写入消息。------------------截取自Startup.cs-------------------------
public void ConfigureServices(IServiceCollection services)
{// Redis客户端要定义成单例 不然在大流量并发收数的时候 会造成redis client来不及释放。另一方面也确认api控制器不是单例模式var csredis new CSRedisClient(Configuration.GetConnectionString(redis),namereceiver);RedisHelper.Initialization(csredis);services.AddSingleton(csredis);services.AddMvc();
}
------------------截取自数据接收Controller-------------------
[Route(batch)]
[HttpPost]
public async Task BatchPutEqidAndProfileIds([FromBody]ListEqidPair eqidPairs)
{if (!ModelState.IsValid)throw new ArgumentException(Http Body Payload Error.);var redisKey ${DateTime.Now.ToString(yyyyMMdd)}; eqidPairs await EqidExtractor.EqidExtractAsync(eqidPairs);if (eqidPairs ! null eqidPairs.Any())RedisHelper.LPush(redisKey, eqidPairs.ToArray());await Task.CompletedTask;}
消费者WebApp 根据以上RedisMQ思路事件消费方式是拉取pull故需要轮询Redis List数据结构这里使用AspNetCore内置的BackgroundService后台服务类后台轮询消费关注后台Job中的循环接收方法。public class BackgroundJob : BackgroundService
{private readonly IEqidPairHandler _eqidPairHandler;private readonly CSRedisClient[] _cSRedisClients;private readonly IConfiguration _conf;private readonly ILogger _logger;public BackgroundJob(IEqidPairHandler eqidPairHandler, CSRedisClient[] csRedisClients,IConfiguration conf,ILoggerFactory loggerFactory){_eqidPairHandler eqidPairHandler;_cSRedisClients csRedisClients;_conf conf;_logger loggerFactory.CreateLogger(nameof(BackgroundJob));}protected override async Task ExecuteAsync(CancellationToken stoppingToken){_logger.LogInformation(Service starting);if (_cSRedisClients[0] null){_cSRedisClients[0] new CSRedisClient(_conf.GetConnectionString(redis) ,defaultDatabase 0);}RedisHelper.Initialization(_cSRedisClients[0]);while (!stoppingToken.IsCancellationRequested){var key $eqidpair:{DateTime.Now.ToString(yyyyMMdd)};var eqidpair RedisHelper.BRPop(5, key);if (eqidpair ! null)await _eqidPairHandler.AcceptEqidParamAsync(JsonConvert.DeserializeObjectEqidPair(eqidpair));// 强烈建议无论如何休眠一段时间防止突发大流量导致WebApp进程CPU满载自行根据场景设置合理休眠时间await Task.Delay(10, stoppingToken);}_logger.LogInformation(Service stopping);}
}
迭代验证使用docker-compose单机部署Nginx,ReceiverApp,WebApp容器。docker-compose up指令默认只会重建[Service配置或Image变更]的容器。If there are existing containers for a service, and the service’s configuration or image was changed after the container’s creation, docker-compose up picks up the changes by stopping and recreating the containers (preserving mounted volumes). To prevent Compose from picking up changes, use the --no-recreate flag.做一次迭代验证更新docke-compose.yml文件WebApp服务的镜像版本docker-compose up下图显示仅 数据处理容器 WebApp被RecreateNice,分布式改造完成效果很明显现在可以放心安全的迭代核心WebApp数据处理程序。 https://redis.io/commands/brpop https://redis.io/commands/lpush文字制图均为原创扫码点赞让干货飞一会。............往期推荐 TPL Dataflow组件应对高并发,低延迟要求docker stack,docker-compose前世今生点赞的朋友年后老板加鸡腿