博客网站大全,如何查看网站使用什么程序做的,推广公司的网站,网站开发付款方式和比例RabbitMQ结构 Publisher #xff1a; 生产者 Queue: 存储消息的容器队列#xff1b; Consumer:消费者 Connection#xff1a;消费者与消息服务的TCP连接 Channel:信道#xff0c;是TCP里面的虚拟连接。例如#xff1a;电缆相当于TCP#xff0c;信道是一条独立光纤束 生产者 Queue: 存储消息的容器队列 Consumer:消费者 Connection消费者与消息服务的TCP连接 Channel:信道是TCP里面的虚拟连接。例如电缆相当于TCP信道是一条独立光纤束一条TCP连接上创建多少条信道是没有限制的。TCP一旦打开就会出AMQP信道。无论是发布消息接收消息订阅队列这些动作都是通过信道完成的。 Broker: 一台消息服务就是一个Broker; Exchange:交换机、负责接收生产者的消息转发到队列中、交换机和队列通过路由键绑定、可以理解为每个队列都有自己的名称
SpringBoot整合RabbitMQ
Queue 消息存放于队列中 若是RabbitMQ挂了则消息会丢失因此要开启持久化 将durable设置为true,若是没有消费者消费该队列则该队列会自动删除 因此需要将autoDelete参数设置为false; public Queue(String name) {// 队列名称 是否持久化是否独占 是否自动删除this(name, true, false, false);}RabbitListener
RabbitListener(bindingsQueueBinding(value Queue(value${mq.config.queue.info},autoDeletetrue),exchangeExchange(value${mq.config.exchange},typeExchangeTypes.DIRECT),key${mq.config.queue.info.routing.key}))用来标记消费者exchange表示交换器信息、类型bindings表示监听器要绑定的队列、以及队列信息 key代表交换机和队列通过key绑定的
AmqpTemplate / RabbitTempldate: 生产者通过依赖此工具类发送消息
先安装RabbitMQ创建SpringBoot项目修改配置
# 应用名称
spring.application.nameboolfilter# 应用服务 WEB 访问端口
server.port8080spring.rabbitmq.host127.0.0.1
spring.rabbitmq.port5672
spring.rabbitmq.usernameguest
spring.rabbitmq.passwordguest入门级别程序
发送hello world程序 生产者
public class Tut1Sender {Autowiredprivate RabbitTemplate template;Autowiredprivate Queue queue;Scheduled(fixedDelay 1000, initialDelay 500)public void send() {String message Hello World!;this.template.convertAndSend(queue.getName(), message);System.out.println( [x] Sent message );}
}消费者
RabbitListener(queues hello)
public class Tut1Receiver {RabbitHandlerpublic void receive(String in) {System.out.println( [x] Received in );}
}将生产者、消费者注入容器
Configuration
EnableScheduling
public class Tut1Config {Beanpublic Queue hello() {return new Queue(hello);}Beanpublic Tut1Receiver receiver() {return new Tut1Receiver();}Beanpublic Tut1Sender sender() {return new Tut1Sender();}
}运行结果 [x] Sent ‘Hello World!’ [x] Received ‘Hello World!’ [x] Sent ‘Hello World!’ [x] Received ‘Hello World!’ [x] Sent ‘Hello World!’ … 工作队列
主要思想是避免 立即执行资源密集型任务必须等待它要完成。相反我们将任务安排在以后完成。我们将任务封装为消息并将其发送到队列。正在运行的工作进程 在后台将弹出任务并最终执行工作 生产者
public class Tut2Sender {Autowiredprivate RabbitTemplate template;Autowiredprivate Queue queue;AtomicInteger dots new AtomicInteger(0);AtomicInteger count new AtomicInteger(0);Scheduled(fixedDelay 1000, initialDelay 500)public void send() {在这里插入代码片StringBuilder builder new StringBuilder(Hello);if (dots.incrementAndGet() 4) {dots.set(1);}for (int i 0; i dots.get(); i) {builder.append(.);}builder.append(count.incrementAndGet());String message builder.toString();template.convertAndSend(queue.getName(), message);System.out.println( [x] Sent message );}}消费者
RabbitListener(queues hello)
public class Tut2Receiver {private final int instance;public Tut2Receiver(int i) {this.instance i;}RabbitHandlerpublic void receive(String in) throws InterruptedException {StopWatch watch new StopWatch();watch.start();System.out.println(instance this.instance [x] Received in );doWork(in);watch.stop();System.out.println(instance this.instance [x] Done in watch.getTotalTimeSeconds() s);}private void doWork(String in) throws InterruptedException {for (char ch : in.toCharArray()) {if (ch .) {Thread.sleep(1000);}}}
}队列、生产者、消费者注入容器
Configuration
public class Tut2Config {Beanpublic Queue hello() {return new Queue(hello);}private static class ReceiverConfig {Beanpublic Tut2Receiver receiver1() {return new Tut2Receiver(1);}Beanpublic Tut2Receiver receiver2() {return new Tut2Receiver(2);}}Beanpublic Tut2Sender sender() {return new Tut2Sender();}
}运行结果 [x] Sent ‘Hello.1’ instance 1 [x] Received ‘Hello.1’ [x] Sent ‘Hello…2’ instance 2 [x] Received ‘Hello…2’ instance 1 [x] Done in 1.0062309s [x] Sent ‘Hello…3’ instance 1 [x] Received ‘Hello…3’ instance 2 [x] Done in 2.0085791s [x] Sent ‘Hello.4’ instance 2 [x] Received ‘Hello.4’ … 消息确认
SpringBoot整合RabbitMQ代码中若消费者消费出现异常则会重新进入队列 一般生产环境中是要有重试机制的若是要关闭重试机制、则设置defaultRequeueRejectedfalse 或者抛出AmqpRejectAndDontRequeueException异常这样框架会帮我们自动提交确认channel.basicAck()重试机制也会存在问题、若是消费者服务关闭、则消息会不断重新入队、导致RabbitMQ内存最终爆满宕机消息的ACK确认机制默认是打开的如果忘记了ACK后果很严重当Consumer退出时消息会一直重新分发然后RabbitMq会占用越来越多的内存由于RabbitMq会长时间运行出现“内存泄露”是致命的
异常处理方案
使用try-catch捕捉使用重试机制、超过一定次数、则丢弃消息或放入死信队列 spring.rabbitmq.listener.retry.max-attempts5 //重试超过5次消息丢弃 公平调度与循环调度 默认情况下RabbitMQ 会将每条消息发送给下一个消费者。平均而言每个消费者将获得相同数量的 消息。这种分发消息的方式称为轮询。 在这种模式下调度不一定完全按照我们想要的方式工作。 若是存在两台机器一台性能好、一台性能差 而RabbitMQ对此一无所知仍然会调度 消息均匀。发生这种情况是因为 RabbitMQ 只是在消息时调度消息 进入队列。它不看未确认的数量 面向消费者的消息。它只是盲目地发送每 n 条消息 给第 n 个消费者这就导致了一台机器特别忙碌、一台机器空闲 “公平调度”是Spring AMQP的默认配置。Consumer可以向服务器声明一个prefetchCount, 表示轮到自己时、自己可处理多少消息这样RabbitMQ转发消息给消费者时、会先看Consumer正在处理的消息数量是否达到了prefetchCount, 若已达到该值则发给其他的Consumer;
发布/订阅 特点一条消息同时会被所有消费者消息X是交换机(Exchange)交换机和队列进行绑定(Binding) 交换机负责接收生产者发送的消息再转发消息到队列中实现了生产者与队列的解耦
RabbitMQ 中消息传递模型的核心思想是生产者 从不将任何消息直接发送到队列
示例1 广播匿名队列
发送者
public class Tut3Sender {Autowiredprivate RabbitTemplate template;Autowiredprivate FanoutExchange fanout;AtomicInteger dots new AtomicInteger(0);AtomicInteger count new AtomicInteger(0);Scheduled(fixedDelay 1000, initialDelay 500)public void send() {StringBuilder builder new StringBuilder(Hello);if (dots.getAndIncrement() 3) {dots.set(1);}for (int i 0; i dots.get(); i) {builder.append(.);}builder.append(count.incrementAndGet());String message builder.toString();template.convertAndSend(fanout.getName(), , message);System.out.println( [x] Sent message );}}消费者
public class Tut3Receiver {RabbitListener(queues #{autoDeleteQueue1.name})public void receive1(String in) throws InterruptedException {receive(in, 1);}RabbitListener(queues #{autoDeleteQueue2.name})public void receive2(String in) throws InterruptedException {receive(in, 2);}public void receive(String in, int receiver) throws InterruptedException {StopWatch watch new StopWatch();watch.start();System.out.println(instance receiver [x] Received in );doWork(in);watch.stop();System.out.println(instance receiver [x] Done in watch.getTotalTimeSeconds() s);}private void doWork(String in) throws InterruptedException {for (char ch : in.toCharArray()) {if (ch .) {Thread.sleep(1000);}}}}交换机、匿名队列、绑定生产者、消费者注入容器
public class Tut3Config {Beanpublic FanoutExchange fanout() {return new FanoutExchange(tut.fanout);}private static class ReceiverConfig {Beanpublic Queue autoDeleteQueue1() {return new AnonymousQueue();}Beanpublic Queue autoDeleteQueue2() {return new AnonymousQueue();}Beanpublic Binding binding1(FanoutExchange fanout,Queue autoDeleteQueue1) {return BindingBuilder.bind(autoDeleteQueue1).to(fanout);}Beanpublic Binding binding2(FanoutExchange fanout,Queue autoDeleteQueue2) {return BindingBuilder.bind(autoDeleteQueue2).to(fanout);}Beanpublic Tut3Receiver receiver() {return new Tut3Receiver();}}Beanpublic Tut3Sender sender() {return new Tut3Sender();}
}运行结果
instance 1 [x] Received Hello.1
instance 2 [x] Received Hello.1
instance 2 [x] Done in 1.0057994s
instance 1 [x] Done in 1.0058073s
....模拟Spring容器发布ContextRefreshedEvent事件
通常情况下业务开发中经常会监听该事件做扩展例如初始化数据 打印日志等等 生产者
public class AppContextSender {AutowiredRabbitTemplate rabbitTemplate;Scheduled(fixedDelay 1000, initialDelay 500)public void publishContextRefreshEvent() {rabbitTemplate.convertAndSend(contextRefreshedExchange, , publish refreshed event);}
}
消费者
RabbitListener(queues {initQueue})
public class InitContextRefreshedConsumer {RabbitHandlerpublic void consum(String in) {System.out.println(init :in);}
}RabbitListener(queues logQueue)
public class LogContextRefreshedConsumer {RabbitHandlerpublic void consum(String in) {System.out.println(log : in);}
}
交换机、队列、绑定、生产者、消费者注入容器
Configuration
public class ContextRefreshedConfig {Beanpublic FanoutExchange contextRefreshedExchange(){return new FanoutExchange(contextRefreshedExchange);}Beanpublic AppContextSender appContextSender() {return new AppContextSender();}public static class ConsumerConfig {Beanpublic Queue initQueue() {return new Queue(initQueue);}Beanpublic Queue logQueue() {return new Queue(logQueue);}Beanpublic Binding initBinding(Queue initQueue, FanoutExchange contextRefreshedExchange) {return BindingBuilder.bind(initQueue).to(contextRefreshedExchange);}Beanpublic Binding logBinding(Queue logQueue, FanoutExchange contextRefreshedExchange) {return BindingBuilder.bind(logQueue).to(contextRefreshedExchange);}Beanpublic InitContextRefreshedConsumer initContextRefreshedConsumer() {return new InitContextRefreshedConsumer();}Beanpublic LogContextRefreshedConsumer logContextRefreshedConsumer() {return new LogContextRefreshedConsumer();}}} log : publish refreshed event init :publish refreshed event log : publish refreshed event init :publish refreshed event … Direct直接模式
交换器绑定多个队列每个绑定关系有自己的路由键之前业务开发中、有一个交换机、绑定了两个队列一个队列用来发送邮件一个队列用来发送短信 像广播模式下如果只想发邮件则没法t做到使用direct模式和工作模式则可以做到 最后使用了direct 生产者
public class BaseServiceSender {Autowiredprivate RabbitTemplate template;Autowiredprivate DirectExchange messageExchange;AtomicInteger index new AtomicInteger(0);AtomicInteger count new AtomicInteger(0);private final String[] keys {sms, mail};Scheduled(fixedDelay 1000, initialDelay 500)public void send() {//短信String sms {userName: xxx; phone:xxx};template.convertAndSend(messageExchange.getName(), sms, sms);//邮件String mail {userName: xxx; mail:xxx};template.convertAndSend(messageExchange.getName(), mail, mail);}
}
消费者
RabbitListener(queues mailQueue)
public class MailConsumer {RabbitHandlerpublic void consum(String in) {System.out.println(send mail : in);}
}RabbitListener(queues smsQueue)
public class SmsConsumer {RabbitHandlerpublic void consum(String in) {System.out.println(send sms : in);}
}
交换机、队列绑定、消费者生产者注入容器
Configuration
public class DirectConfig {Beanpublic DirectExchange messageExchange() {return new DirectExchange(messageExchange);}Beanpublic BaseServiceSender baseServiceSender() {return new BaseServiceSender();}public static class ConsumerGroup {Beanpublic MailConsumer mailConsumer() {return new MailConsumer();}Beanpublic SmsConsumer smsConsumer() {return new SmsConsumer();}Beanpublic Queue mailQueue() {return new Queue(mailQueue);}Beanpublic Queue smsQueue() {return new Queue(smsQueue);}Beanpublic Binding smsBinding(DirectExchange messageExchange, Queue smsQueue){return BindingBuilder.bind(smsQueue).to(messageExchange).with(sms);}Beanpublic Binding mailBinding(DirectExchange messageExchange, Queue mailQueue){return BindingBuilder.bind(mailQueue).to(messageExchange).with(mail);}}
}
运行结果 send mail : {userName: xxx; mail:xxx} send sms : {userName: xxx; phone:xxx} send sms : {userName: xxx; phone:xxx} send mail : {userName: xxx; mail:xxx} … Topic主题模式
发送到主题交换的消息不能有任意routing_key 它必须是单词列表由点分隔。这 单词可以是任何东西一些有效的路由密钥示例 “stock.usd.nyse” “nyse.vmw” “quick.orange.rabbit”。可以有 路由密钥中随心所欲地包含多个单词最多可达 255 个 字节。 绑定密钥也必须采用相同的形式。主题交换背后的逻辑类似于直接交换 - 发送的消息带有 特定的路由键将被传递到所有队列 绑定匹配的绑定键*星号可以代替一个词。#哈希可以替换零个或多个单词。 若是消息指定的路由键为xxx.orange.xxx 则会匹配到Q1, 若是lazy.xxx.xx则是Q2;
生产者
public class Tut5Sender {Autowiredprivate RabbitTemplate template;Autowiredprivate TopicExchange topic;AtomicInteger index new AtomicInteger(0);AtomicInteger count new AtomicInteger(0);private final String[] keys {quick.orange.rabbit, lazy.orange.elephant, quick.orange.fox,lazy.brown.fox, lazy.pink.rabbit, quick.brown.fox};Scheduled(fixedDelay 1000, initialDelay 500)public void send() {StringBuilder builder new StringBuilder(Hello to );if (this.index.incrementAndGet() keys.length) {this.index.set(0);}String key keys[this.index.get()];builder.append(key).append( );builder.append(this.count.incrementAndGet());String message builder.toString();template.convertAndSend(topic.getName(), key, message);System.out.println( [x] Sent message );}}消费者
public class Tut5Receiver {RabbitListener(queues #{autoDeleteQueue1.name})public void receive1(String in) throws InterruptedException {receive(in, 1);}RabbitListener(queues #{autoDeleteQueue2.name})public void receive2(String in) throws InterruptedException {receive(in, 2);}public void receive(String in, int receiver) throwsInterruptedException {StopWatch watch new StopWatch();watch.start();System.out.println(instance receiver [x] Received in );doWork(in);watch.stop();System.out.println(instance receiver [x] Done in watch.getTotalTimeSeconds() s);}private void doWork(String in) throws InterruptedException {for (char ch : in.toCharArray()) {if (ch .) {Thread.sleep(1000);}}}
}
交换器队列绑定、生产者消费者注入容器
Configuration
public class Tut5Config {Beanpublic TopicExchange topic() {return new TopicExchange(tut.topic);}private static class ReceiverConfig {Beanpublic Tut5Receiver receiver() {return new Tut5Receiver();}Beanpublic Queue autoDeleteQueue1() {return new AnonymousQueue();}Beanpublic Queue autoDeleteQueue2() {return new AnonymousQueue();}Beanpublic Binding binding1a(TopicExchange topic,Queue autoDeleteQueue1) {return BindingBuilder.bind(autoDeleteQueue1).to(topic).with(*.orange.*);}Beanpublic Binding binding1b(TopicExchange topic,Queue autoDeleteQueue1) {return BindingBuilder.bind(autoDeleteQueue1).to(topic).with(*.*.rabbit);}Beanpublic Binding binding2a(TopicExchange topic,Queue autoDeleteQueue2) {return BindingBuilder.bind(autoDeleteQueue2).to(topic).with(lazy.#);}}Beanpublic Tut5Sender sender() {return new Tut5Sender();}}运行结果 [x] Sent ‘Hello to lazy.orange.elephant 1’ instance 2 [x] Received ‘Hello to lazy.orange.elephant 1’ instance 1 [x] Received ‘Hello to lazy.orange.elephant 1’ [x] Sent ‘Hello to quick.orange.fox 2’ [x] Sent ‘Hello to lazy.brown.fox 3’ instance 1 [x] Done in 2.0110456s … RPC远程过程调用
RabbitMQ也实现了RPC的功能但是业务开发中根本没有使用场景RPC要么使用Dubbo, 要么使用OpenFeign, 使用RabbitMQ做RPC的信息目前都没有看到
总结
就目前来说、工作队列、发布订阅两个模式业务开发中会使用到其他的消息场景很少见。底层是基于RabbitMQ-client做的封装出RabbitTempldate使用除非远古项目否则不推荐使用RabbitMQ-Client原生API写太费时间了。我写了一会就放弃了