门户网站开发 价格,中国企业500强2022,网站设计需要考虑哪些基本原则,wordpress 运营商广告消息队列:
组成:
交换器,队列,绑定
作用:异步处理,削峰,服务解耦
交换器
RabbitMQ常见的exchange(交换器)类型: direct–路由键完全匹配才可以 fanout–广播 topic --主题,模糊匹配路由键
队列
messagequeue:
组成: 路由键 routine-key—决定消息发给谁 优先级prio…
消息队列:
组成:
交换器,队列,绑定
作用:异步处理,削峰,服务解耦
交换器
RabbitMQ常见的exchange(交换器)类型: direct–路由键完全匹配才可以 fanout–广播 topic --主题,模糊匹配路由键
队列
messagequeue:
组成: 路由键 routine-key—决定消息发给谁 优先级priority–决定消息发送的优先级 分发模式deliver-mode–决定消息的发送方式–持久化等
绑定
binding:
依赖 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactIdversion2.6.4/version/dependency配置文件:
spring:rabbitmq:username: zzypassword: 1234host: 172.24.232.166注册队列:
Configuration
public class RabbitConfig {Beanprotected Queue queue(){Queue queue new Queue(zzy);return queue;}
}生产者发送消息
//作为信息的发布者
SpringBootTest(classes ApplicationRabbitMq.class)
public class RabbitTest {//Amqp模板类Autowiredprivate AmqpTemplate amqpTemplate;Testvoid RabbitTest(){amqpTemplate.convertAndSend(zzy,hello world);System.out.println(success);}
}消费者消费消息:
Component //加注解,不然无法解析
public class RabbitMqConsumer {RabbitListener(queues zzy)//订阅的队列public void Listened1(String msg){System.out.println(取出的消息1----msg);}RabbitListener(queues zzy)public void Listened2(String msg){System.out.println(取出的消息2----msg);}
}默认使用direct队列,如果要使用广播队列:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitFanoutConfig {//准备两个队列Beanprotected Queue queue() {return new Queue(gavin);}Beanprotected Queue queue2() {return new Queue(zzy);}//内置fanout交换器名称amq.fanoutBeanprotected FanoutExchange fanoutExchange() {return new FanoutExchange(amq.fanout);}//将交换器和队列绑定Beanprotected Binding fanoutBinding(Queue queue, FanoutExchange fanoutExchange) {return BindingBuilder.bind(queue).to(fanoutExchange);}Beanprotected Binding fanoutBinding2(Queue queue2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(queue2).to(fanoutExchange);}
}一个交换器可以绑定多个队列;
几个重要的注解/Bean
import org.springframework.amqp.core.AmqpTemplate;
Bean
AmqpTemplate amqpTemplate
//此模板中有发送消息的方法-------------------------------------
import org.springframework.amqp.core.Queue;
//注册一个队列directBeanprotected Queue queue() {return new Queue(gavin);}//注册一个广播交换器Beanprotected FanoutExchange fanoutExchange() {return new FanoutExchange(amq.fanout);}
//交换器绑定队列//将交换器和队列绑定Beanprotected Binding fanoutBinding(Queue queue, FanoutExchange fanoutExchange) {return BindingBuilder.bind(queue).to(fanoutExchange);}
//注册一个topic交换器Beanprotected TopicExchange topicExchange() {return new TopicExchange(amq.topic);}
//绑定队列//将交换器和队列绑定Beanprotected Binding TopicBind1(Queue queue, TopicExchange topicExchange) {return BindingBuilder.bind(queue).to(topicExchange).with(com.gavin.*);//匹配路由规则}消费者: RabbitListener(queues gavin)该方法从对列中消费消息消息重复消费原因:
消费完毕后本该向broker发送ack,但是由于网路延迟较高过了broker等待的时间,于是broker会把消息再次投递到consumer
解决方案:
数据库—处理消息前,使用消息主键在表中带有约束的字段中insert,插入成功则消费成功,插入失败则已经消费过了,不再进行消费
Map–单机版的使用ConrrentHashMap -putifAbsent
Redis --分布式锁
保证消息队列的消费顺序
同一个topic,同一个queue,发的时候让一个线程去发,消费的时候让一个线程去消费,如果多线程暂时无法保证消费的有序性
怎么保证消息发送到同一个queue?
RocketMQ 提供了一个MessageQueueSelector 接口,重写接口方法
RocketMQ如何保证消息不丢失
Producer端:
采用send()同步发消息,发送结果是同步感知的;
Broker端:
设置数显策略为同步刷新策略
集群部署,配置主从,高可用模式
Consumer端:
消费正常后再进行手动ACK确认