淘宝客网站主题,现在前端开发用什么技术,企业做企业网站的好处,网站运行费用1、什么是MQ#xff1f;
MQ全称message queue#xff08;消息队列#xff09;#xff0c;本质是一个队列#xff0c;FIFO先进先出#xff0c;是消息传送过程中保存消息的容器#xff0c;多 用于分布式系统之间进行通信。
在互联网架构中#xff0c;MQ是一种非常常见的…1、什么是MQ
MQ全称message queue消息队列本质是一个队列FIFO先进先出是消息传送过程中保存消息的容器多 用于分布式系统之间进行通信。
在互联网架构中MQ是一种非常常见的上下游“逻辑解耦物理解耦”的消息通讯服务使用了MQ后消息发送上游只需要依赖MQ不需要依赖其他的服务。 2、为什么使用MQ
流量削峰应用解耦 比如电商系统中分为订单系统支付系统库存系统物流系统如果订单系统直接调用三种系统其中一个系统出现了短暂的故障订单系统就属于不可用的状态如果使用mq,订单系统生成的订单直接存放在MQ中即便其余某个系统短暂故障订单系统不感知系统可用性增强异步处理 假设A调用B B是异步处理并且需要很长时间来处理但是A需要知道B的处理结果通常做法是 A每隔一段时间去调用B的查询函数或者A提供一个回调函数让B调用完成之后通知A。MQ提供了一种新的处理思路即B处理完之后发送一条消息给MQMQ将消息给A进行处理。
3、实现MQ的两种主流方式
两种AMQP和JMS
AMQP即 Advanced Message Queuing Protocol高级消息队列协议是一个网络协议是应用层协议的一个开放标准为面向消息的中间件设计的。JMS 即 Java 消息服务Java Message Service应用程序接口是一个java平台中关于面向消息中间件的API。是javaEE规范的一种。
区别
JMS定义了统一的接口来对消息操作进行统一而AMQP是通过协议规定了数据交互格式JMS限制了必须使用javaAMQP只是协议不规定实现方式是跨语言的JMS规定了两种消息模式AMQP的消息模式更加丰富 4、MQ的选择
kafka主要特点是基于pull的模式来处理消息消费追求高吞吐量一开始的目的就是用于日志收集和传输适合产生大量数据的数据收集业务如果有日志采集功能首选kafkarocketMQ: 适用于金融互联网领域对可靠性要求很高的场景适用阿里双11尤其是电商里面的订单扣款、业务削峰等稳定性好适用于并发场景。rabbitMQ性能好时效性微妙级别社区活跃度高功能完备管理界面使用方便适合数据量没有那么大的中小型公司。 5、rabbitMQ中的四大核心概念
生产者产生数据发送消息的程序交换机rabbitMQ中非常重要的一个部件一方面接收来自生产者的消息另一方面将消息推送到队列中。交换机必须要确切知道如何处理接收到的消息队列存放消息的数据结构本质是一个大的消息缓冲区消费者大多数情况是一个等待接收消息的程序 6、rabbitMQ的基本结构 producer消息生产者即生产消息的客户端consumer消息消费者即消费消息的客户端接收MQ转发的消息connectionproducer/consumer 和broker之间的TCP连接channel如果每一次访问rabbitMQ都建立一个connection在消息量大的时候建立TCP连接的开销是巨大的效率也很低channel是在connection内部建立的逻辑连接一个连接内包含多个信道每次发消息只占用一个信道这样就极大的减少了建立connection的开销。broker接收和分法消息的应用消息队列的服务进程包括两个部分exchange和queueexchange消息队列交换机按照一定的规则将消息路由转发给到某个队列对消息进行过滤queue消息队列存储消息bindingexchange和queue之间的虚拟连接 生产者生产消息的过程
producer先连接到broker这个步骤需要先建立connection连接并开启一个信道channelproducer声明一个交换器并设置相关属性 交换器写空字符串会使用默认的交换器producer声明一个队列并设置相关属性producer通过绑定将交换器和队列进行绑定producer发送消息到broker其中包含路由键交换器等信息交换器根据收到的路由键查找对应的队列如果找到就会将消息存入相应的队列如果没有找到会根据producer的配置选择丢弃或者是退回给生产者关闭信道
消费者接收消息的过程
consumer连接到broker建立connection连接开启一个信道consumer请求消费相应队列中的消息可以设置响应的回调函数等待broker回应并投递相应队列中的消息接收消息consumer确认收到消息ack响应rabbitMQ接收到ack将队列中的消息删除关闭信道。 7、rabbitMQ的消息应答机制ack
rabbitMQ向消费者传递完消息后会删除该条消息kafka中是不删除的这个是一点差异
为了保证消息在发送过程中不丢失rabbitMQ引入了消息应答机制消费者在接收消息并处理该消息后告诉rabbitMQ他已经处理了此时rabbitMQ就可以把该消息删除了。
自动应答消息一旦被消费者接收自动发送ack手动应答消息接收后不会发送ack需要手动调用
如何选择应答方式呢
如果消息不太重要丢失也没有影响那么选择自动ack会比较好--- 性能高可能丢失数据如果不允许消息丢失那么需要选择在消费完成后手动ack --- 可靠性高性能稍差
8、rabbitMQ消息的重新入列
如果消费者由于某些原因失去连接导致消费者未成功发送ACK确认应答RabbitMQ将会对未完全处理完的消息重新入队如果其他消费者可以处理则该消息将被分配到另一个消费者从而保证消息未丢失。 9、rabbitMQ的持久化
队列持久化消息持久化exchange持久化
持久化只是告诉rabbitMQ将消息保存到磁盘但是并不能真正的保证数据不丢失准备从内存往磁盘写的时候rabbitMQ挂掉了 队列持久化是在定义队列的时候由durable参数决定的设置为true的时候才会持久化队列。
Connection connection connectionFactory.newConnection();
Channel channel connection.createChannel();
//第二个餐胡设置为true代表队列持久化
channel.queueDeclare(queue.persistent.name, true, false, false, null);消息持久化是在发布消息的时候设置的
//通过传入MessageProperties.PERSISTENT_PLAIN就可以实现消息持久化
channel.basicPublish(exchange.persistent, persistent, MessageProperties.PERSISTENT_TEXT_PLAIN, persistent_test_message.getBytes());exchange的持久化如果不设置exchange的持久化对消息的可靠性来说没有什么影响**但是同样如果exchange不设置持久化那么当broker服务重启之后exchange将不复存在那么既而发送方rabbitmq producer就无法正常发送消息。因此建议同样设置exchange的持久化。
一般只需要channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout” true); 就是在声明的时候讲durable字段设置为true就行了。 10、rabbitMQ的分发机制以及如何修改
rabbitMQ默认的分发机制是轮询模式是公平的但是实际场景中并不适用比如consumerA处理消息很快consumerB处理消息很满那么轮询的机制就会导致consumerA有很多时间处于空闲因此需要修改成能者多劳的模式。
如何实现
对信道进行设置通过 BasicQos 方法设置prefetchCount 1需要注意的是不公平分发只在手动ack的时候才会生效。 11、rabbitMQ中的预取值
预取值是消费者信道最大传输信息数。上面说了如何设置rabbitMQ 的不公平分发即设置prefetchCount 1其实这个值是可以设置更大的数字的这个设置的值就是预取值。 我们将慢的消费者preCount取值为5快的消费者预取值为2然后发送7条消息实际慢的服务器会收到5条消息第一条处理的时候其余四条会堆积快的服务器只会收到2条消息。
这是因为快的消费者信道满了不能再发送消息所以消息只能发送给慢的服务器这就是basicQos用法。 12、rabbitMQ的发布确认机制
发布确认机制有三种方式
单个确认发布一种简单的同步确认发布的方式也即是只有前一个发布的消息确认发布之后后续的消息才可以继续发布。 缺点就是发布速度慢没有确认发布的消息会阻塞后续消息发布适用于每秒数百条消息吞吐量的环境。批量确认发布也是同步确认的方式一样会阻塞后续消息的发布但是可以先发布一批消息然后一起确认提高吞吐量缺点就是发生故障导致发布失败后不知道那个消息有问题必须将整个批处理保存在内存中来记录重要的信息然后重新发布消息异步确认发布效率和可靠性都比较高利用回调函数来达到消息的可靠性传递这种情况下所有在该信道上发布的消息都会被指派一个唯一的ID 一旦消息被投递到所有匹配的队列后rabbitMQ就会发送一个确认给生产者包含这个消息的唯一id这样生产者就知道消息已经正确的到达目的队列了如果rabbitMQ没能处理这个消息也会发送一个NACK 的消息给producer这时就可以进行重试操作。
13、如何处理异步未确认的消息 简陋版本 将未确认的消息放到一个基于内存的能够被发布线程访问的队列中能够在confirm callbacks线程和发布线程之间进行消息传递。 比如使用ConcurrentSkipListMap这个是基于并发的有序map集合。ConcurrentHashMap是无序的 1、RabbitMQ的消息确认机制确保了消息的可靠抵达其中ConfirmCallback是其中一种实现方式
ConfirmCallback是一个回调函数用于在消息被确认时进行回调以确保消息已经被正确地发送到RabbitMQ Broker并被处理。当生产者发送消息时可以通过调用channel的confirmSelect()方法将channel设置为confirm模式然后通过添加ConfirmCallback回调函数来处理消息确认。当消息被发送到Broker后如果Broker成功地将消息路由到目标队列则会调用ConfirmCallback回调函数的handleAck()方法表示消息已被确认。如果Broker无法将消息路由到目标队列则会调用handleNack()方法表示消息未被确认使用ConfirmCallback可以确保消息已经被正确地发送到RabbitMQ Broker并被处理从而避免了消息丢失或重复发送的情况。同时ConfirmCallback还可以在消息未被确认时进行重试或记录日志等操作以确保消息的可靠性和稳定性。 2、RabbitMQ的ReturnCallback机制是为了解决消息无法路由到指定队列的问题。
当发送的消息无法被路由到指定队列时RabbitMQ会将消息返回给生产者这时候如果生产者设置了ReturnCallback回调函数就可以在回调函数中处理这种情况ReturnCallback机制的使用场景一般是在消息发送时指定了mandatory参数为true表示如果消息无法被路由到指定队列则将消息返回给生产者。如果mandatory参数为false则消息会被直接丢弃。当生产者设置了ReturnCallback回调函数后RabbitMQ在将消息返回给生产者时会触发该回调函数。在ReturnCallback回调函数中可以处理消息无法路由的情况例如重发消息、记录日志等。需要注意的是ReturnCallback机制只有在消息被发送到交换机后才会触发。如果消息发送的交换机不存在或者路由键不符合任何绑定规则消息会被直接丢弃不会触发ReturnCallback回调函数。
3、备份交换机
通过mandatory参数和消息回退机制可以处理交换机投递失败的消息但是消息回退给生产者后有时候并不知道如何处理这些消息最多就是打印一个日志存在缓存中然后定时重试投递还要考虑多次投递失败后的告警等等。如果生产者多了的话每个生产者都要写这些逻辑代码无疑大大增加了生产者的复杂性。
rabbitMQ中有死信队列可以处理消费失败的信息但是当前所说的这些消息根本就没有进入队列因此死信队列也没有用。在 RabbitMQ 中有一种备份交换机的机制存在可以很好的应对这个问题。
备份交换机可以理解为 RabbitMQ 中交换机的“备胎”当我们为某一个交换机声明一个对应的备份交换机时就是为它创建一个备胎当交换机接收到一条不可路由消息时将会把这条消息转发到备份交换机中由备份交换机来进行转发和处理通常备份交换机的类型为 Fanout 这样就能把所有消息都投递到与其绑定的队列中然后我们在备份交换机下绑定一个队列这样所有那些原交换机无法被路由的消息就会都进入这个队列了。 当然还可以建立一个报警队列用独立的消费者来进行监测和报警。
mandatory 参数与备份交换机可以一起使用的时候如果两者同时开启备份交换机的优先级更高。 14、rabbitMQ中的交换机exchange
rabbitMQ消息传递的模型核心思想是生产者生产的消息不直接发送到队列而是通过交换机。事实上生产者压根不知道发送到了哪些队列交换机的功能十分简单一方面接收来自生产者的消息另一方面将消息推入队列。
交换机必须知道如何处理接收到的消息是放入特定的队列还是放入很多队列还是丢弃这些是由交换机的类型决定的。
bindingsbinding其实就是exchange和queue之间的桥梁即是绑定关系
交换机的类型
无名exchange默认exchange声明的时候就是一个空字符串但是通过routingkey绑定queue扇出交换机 fanout: 就是将受到的所有消息广播到他知道的所有队列中routingkey可以是空字符串直接交换机 direct消息只到交换机绑定的队列中通过routingkey 来绑定如果所有队列的routingkey都一样那么就相当于是fanout 交换机了主题交换机 topictopic 交换机的消息的 routing_key 不能随意写必须满足一定的要求它必须是一个单词列表以点号分隔开类似于正则表达式*(星号)可以代替一个单词 #(井号)可以替代零个或多个单词头部交换机 header不通过RoutingKey进行分发消息而时通过消息中内容的headers的 key/value(键值对)匹配队列 性能不高用的少 15、死信和死信队列
什么是死信
在rabbitMQ中消息可能有不同的表现死信顾名思义就是dead message。死信消息通常包括以下几种
消息被拒绝即rabbitMQ返回了一个nack信号消息的TTL过期了消息队列达到最大长度后续消息无法入列消息不符合要求等。。
什么是死信队列
死信队列就是用于存储死信的队列死信队列中有且只有死信构成不会存在其余类型的消息。
死信队列在rabbitMQ中并不会单独存在通常死信队列都会绑定一个普通的消息队列当绑定的消息队列中有消息变成死信了那么这个消息就会重新被交换机路由到指定的死信队列中我们可以通过对这个死信队列进行监听从而手动去对这些消息进行补偿。 如何使用死信队列
在 RabbitMQ 中死信队列的标识为 x-dead-letter-exchange 通过观察死信队列的标识我们不难发现其标识最后为 exchange 即 RabbitMQ 中的交换机RabbitMQ 中的死信队列就是由死信交换机而得出的要想使用死信队列我们需要首先声明一个普通的消息队列并将死信队列的标识绑定到这个普通的消息队列上。 16rabbitMQ中处理消息失败了怎么办
生产环境中使用MQ的时候设计两个队列一个是业务队列专门用来处理消息另外一个死信队列用来处理异常情况。
比如消费者消费消息时数据库等发生了故障无法将数据写入数据库这时消费者就可以将该条消息返回一个nack
一旦返回nackMQ就会将这条消息转入提前设置好的死信队列中数据库故障期间处理的所有失败消息都会转入死信队列消费者设置一个后台线程监控数据库是否正常一旦发现数据库正常后这个线程就把死信队列中的消息取出来重新消费 17、rabbitMQ的延迟队列
延迟队列的内部是有序的最重要的特性就是体现在它的延迟属性上延迟队列中的元素就是希望在指定的时间到了之后将他取出来消费。
延迟队列的使用场景在某个事件发生之后或者之前的指定时间内要做的任务
订单在十分钟内未支付自动取消新用户注册后三天没有登录短信提醒用户退款三天内没有 处理通知相关运营人员预定会议后提前十分钟通知与会人员 18、rabbitMQ的延迟队列怎么实现
18.1死信队列 TTL 过期时间
rabbitMQ并没有直接提供延迟队列功能但是可以通过 死信队列 TTL 过期时间进行实现TTL就是消息或者队列的过期功能。当消息过期就会进到死信队列死信队列和普通队列没啥区别然后我们只需要配置一个消费者来消费死信队列里面的消息就可以了
注意 RabbitMQ只会对队列头部的消息进行过期淘汰消息是否过期是在即将投递消息到消费者之前判定的如果队列出现消息堆积情况则已过期的消息还是会继续存活的比如过期时间设置在消息内由于消息队列是先进先出的假设第一个消息过期时间是10s第二个消息过期时间是1s一前一后几乎同时发消息1s的已经过期了但是10s的还没有过期那么第二个消息也不会从队列中剔除转到死信队列从而导致消息不断积压。 18.2基于插件实现延迟队列
rabbitMQ还可以通过安装插件来实现延迟队列安装过程略。
使用延迟插件的情况下延迟时间短的消息会被优先消费解决了死信队列TTL过期时间导致的消息积压问题。通过交换机延迟消息的方式来实现消息的延迟 上面介绍了rabbitMQ中的延迟队列实现方式当然还有一些其他的选择比如利用java自带的DelayQueue 利用redis中的zset利用kafka的时间轮等等这些方式各有特点可以根据不同的适用场景选择不同的实现方式。 1、DelayQueue DelayQueue是java自带的一个BlockingQueue用于放置实现了Delayed接口的对象。队列中的对象只能在其到期的时候才能从队列中取出。添加元素触发Delayed接口中的compareTo方法按照时间进行排序排在队列头部是最早到期的越往后越晚到期查看元素消费者线程查看元素调用getDelay方法如果方法返回值小于等于0说明元素已经到期则会取出否则返回wait的时间wait时间之后在从头部取出元素注意不能将null放入DelayQueue中。 大数据必学Java基础六十七DelayQueue深入了解 - 知乎 2、redis中的zeset redis 中zset的存储结构是k-v其中value包含了memmber和score通过score可以进行排序生产者将需要延迟发送的数据存redis中的 zset消费者循环从redis的zset队列中获取数据消费时间到了的数据然后删除已经消费了的数据 3、kafka实现延迟队列 创建一个专门的Topic用于存储延迟消息在消息的key中设置延迟时间戳。可以使用当前时间戳加上延迟时间作为key消费者进程不断检查消息的key中的时间戳是否已经过期。 可以使用当前时间戳与消息的key中的时间戳进行比较。如果时间戳已经过期则将消息重新发送到目标Topic中例如target-messages如果时间戳还未过期则将消息重新发送到delayed-messages Topic中并设置一个新的延迟时间戳。 kafka实现延迟队列需要消费者定期从delayed-messages 中查看消息消费者进程宕机就会影响延迟队列功能轮询检查也会消耗资源延迟精度只能达到毫秒级别。 需要注意的是Kafka并不是专门为延迟队列设计的因此在实现过程中需要考虑一些细节问题比如消息的重复消费、消息的顺序等。 19、rabbitMQ的幂等性
幂等性是指用户对统一操作发起的一次或者多次请求结果都是一致的不会因为重复消费而导致结果不一样。
rabbitmq 把消息发给消费者进行消费消费者消费成功后返回ack消息但是这个时候网络中断等原因rabbitMQ没有收到ack消息让rabbitMQ误以为消息消费失败然后rabbitMQ把消息重新发送给其他消费者或者等网络重连后重新发给这个消费者这个时候就会造成重复消费问题。
解决思路
消费者解决幂等性的一般方法就是使用一个唯一标识ID消费前先判断是否已经消费过。
唯一id数据库主键去重redis原子性利用setnx命令天然的幂等性。 20、优先级队列
顾名思义优先级队列可以对元素设置优先级优先级高的消息具备优先消费的特权。
RabbitMQ支持优先级队列在声明channel的时候添加 “x-max-priority”属性RabbitMQ中优先级大小支持0-255但是实际使用我们可以根据需要设置最大的优先级值。
当然在消费端速度大于生产端速度且broker中没有消息堆积的话对发送的消息设置优先级也没什么实际意义因为发送端刚发送完一条消息就被消费端消费了那么就相当于broker至多只有一条消息那么对于单条消息来说优先级是没有什么意义的 21、惰性队列
RabbitMQ从3.6版本引入了惰性队列这一概念惰性队列会尽可能的将消息存入磁盘中消费者消费到响应的消息时才会被加载到内存中他的一个重要的目标是支持更多的消息存储。
默认情况下当生产者将消息发送到RabbitMQ的时候队列中的消息会尽可能地存储在内存之中这样可以更加快速地将消息发送给消费者。即使是持久化的消息在被写入磁盘的同时也会在内存中驻留一份备份。当RabbitMQ需要释放内存的时候会将内存中的消息换页至磁盘中这个操作会耗费较长的时间也会阻塞队列的操作进而无法接收新的消息。虽然RabbitMQ的开发者们一直在升级相关的算法但是效果始终不太理想尤其是在 消息量特别大的时候。惰性队列会将接收到的消息直接存入文件系统而不管是持久化的或者是非持久化的这样可以减少内存的消耗但是会增加I/O的使用如果消息是持久化的那么这样的I/O操作不可避免惰性队列和持久化的消息可谓是“最佳拍档”。注意如果惰性队列中存储的是非持久化的消息内存的使用率会一直很稳定但是重启之后消息一样会丢失。使用x-queue-mode设置为 lazy 22、RabbitMQ集群
RabbitMQ集群有两种模式普通集群和镜像集群。
1、普通集群
就是将RabbitMQ部署到多台服务器上每台服务器启动一个RabbitMQ实例多个实例之间进行消息通信。在普通集群上我们创建的队列queue他的元数据queue的一些配置信息会在所有的RabbitMQ实例中进行同步但是队列中的消息只会存在于一个RabbitMQ实例上不会同步到其他队列。当消费消息的时候如果连接到了另外一个实例那么实例会通过元数据定位到queue所在的位置然后访问queue所在的实例拉取数据过来发送给消费者这种集群可以提高RabbitMQ的消费吞吐能力但是无法保证高可用因为一旦存消息的RabbitMQ挂了消息就没办法访问了。 2、镜像集群
和普通集群的最大区别就是queue数据不在单独存在一台机器上而是同时存储在多台机器上。也就是说每个RabbitMQ都至少有一份镜像数据副本数据。每次写入消息的时候都会自动把数据同步到多台实例上去这样即便一台机器宕机其他机器上还有副本数据可以继续提供服务继而实现了高可用。 23、rabbitMQ中的federation exchange联邦交换机
应用场景
有时候为了容灾等原因会将rabbitMQ部署在不同的城市当跨距离传输的时候会有网络延迟等原因。federation exchange 提供了一个能力可以让原本发送给上游交换器的消息路由到本地的某个队列中联邦队列则允许一个本地消费者接收到来自上游队列的消息。
federation的原理
联邦交换机首先需要创建出下游队列广州的broker3federation插件会在北京broker1上建立一个同名的交换器同时内部创建一个内部交换机并通过路由将两个交换机绑定起来。federation插件还会在broker1 上简历一个队列并和broker3中的交换机之间建立一条AMQP连接来实时地消费队列federation: exchangeA.broker3中的数据对外而言客户端只能看到federation连接是建立在broker1 exchangeA 和brokr3 exchangeA 之间。 24、rabbitMQ中的shovel
shovel插件同样是为了解决数据的转发问题。它能够可靠地从源端broker中的队列中拉取数据并转发到目的端broker的交换机中作为源端的队列和作为目的端的交换机可以位于一个broker中没理解也可以位于不同的broker上shovel的优点松耦合解决不同Broker、集群、用户、vhost、MQ和Erlang版本移动消息支持广域网可以容忍糟糕的网络能保证消息的可靠性高度定制当Shovel成功连接后可以配置。 拓展实现一个定时任务的方法
遍历所有的任务根据时间来判断是否需要执行 优点逻辑简单缺点每秒都要遍历所有的任务很多距离到期时间还远的任务做了很多无用功数据量大的时候会导致任务执行延迟占用CPU根据执行时间采用小顶堆算法每次都取最小的时间进行判读 优点相比较全部遍历比较次数变少缺点数据量大的时候每次插入新数据时间复杂度为Ologn, 但是还有可能导致任务延迟java中的TimerScheduledThreadPoolExcutor 就是这种做法jdk自带的DelayQueue,每次插入都要重新排队时间复杂度Onlogn时间轮 kafka时间轮的原理秒懂 Kafka 时间轮TimingWheel - 知乎避免时间轮的空转从带圈数的时间轮改为多层时间轮 其实就是从单纯小圈转改成先大圈转转到一定位置后然后在小圈转【第一层的跨度为1ms第二层的跨度为20ms第三层的跨度为400ms。那么例如我们放入的任务为501ms则将会放入第三层的第一个节点501%400101冗余了101ms当第三层的指针转到第一个节点时则将101ms的任务转移到第二层再将任务放入到第二层的第5个节点101%201。当第二层的指针转移到低5个节点的时候发现冗余时间则将任务转移到第一层的第一个节点第一层转移一次就执行了。这么做的好处是避免了单轮空转的情况。】