有什么比较好的做海报网站,莆田网站建设网,网站宣传创意视频,想要标注倾斜直线的实际长度文章目录一.RabbitMQ简介二.相关知识1.AMQP2.JMS是什么 #xff1f;三.RabbitMQ的工作原理四.Hello World1.创建Maven工程2.生产者3.消费者五.总结一.RabbitMQ简介
MQ全称为Message Queue#xff0c;即消息队列#xff0c; RabbitMQ是由erlang语言开发#xff0c;基于AMQP…
文章目录一.RabbitMQ简介二.相关知识1.AMQP2.JMS是什么 三.RabbitMQ的工作原理四.Hello World1.创建Maven工程2.生产者3.消费者五.总结一.RabbitMQ简介
MQ全称为Message Queue即消息队列 RabbitMQ是由erlang语言开发基于AMQPAdvanced Message Queue 高级消息队列协议协议实现的消息队列它是一种应用程序之间的通信方法消息队列在分布式系统开发中应用非常广泛。RabbitMQ官方地址http://www.rabbitmq.com/
开发中消息队列通常有如下应用场景 1、任务异步处理。 将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。 2、应用程序解耦合 MQ相当于一个中介生产方通过MQ与消费方交互它将应用程序进行解耦合。
市场上还有哪些消息队列 ActiveMQRabbitMQZeroMQKafkaMetaMQRocketMQ、Redis。
为什么使用RabbitMQ呢 1、简单功能强大。 2、基于AMQP协议。 3、社区活跃文档完善。 4、高并发性能好这主要得益于Erlang语言。 5、Spring Boot默认已集成RabbitMQ
二.相关知识
1.AMQP 总结AMQP是一套公开的消息队列协议最早在2003年被提出它旨在从协议层定义消息通信数据的标准格式为的就是解决MQ市场上协议不统一的问题。RabbitMQ就是遵循AMQP标准协议开发的MQ服务。 百度链接AMQP 官方官网
2.JMS是什么 总结JMS是java提供的一套消息服务API标准其目的是为所有的java应用程序提供统一的消息通信的标准类似java的jdbc只要遵循jms标准的应用程序之间都可以进行消息通信。
JMS和AMQP有什么 不同 jms是java语言专属的消息服务标准它是在api层定义标准并且只能用于java应用而AMQP是在协议层定义的标准是跨语言的。
三.RabbitMQ的工作原理
下图是RabbitMQ的基本结构 组成部分说明如下 Producer消息生产者即生产方客户端生产方客户端将消息发送到MQ。
Broker消息队列服务进程此进程包括两个部分Exchange和Queue。 Exchange消息队列交换机按一定的规则将消息路由转发到某个队列对消息进行过虑。 Queue消息队列存储消息的队列消息到达队列并转发给指定的消费方。
Consumer消息消费者即消费方客户端接收MQ转发的消息。
消息发布和接收流程 -----发送消息----- 1、生产者和Broker建立TCP连接。 2、生产者和Broker建立通道。 3、生产者通过通道消息发送给Broker由Exchange将消息进行转发。 4、Exchange将消息转发到指定的Queue队列
----接收消息----- 1、消费者和Broker建立TCP连接 2、消费者和Broker建立通道 3、消费者监听指定的Queue队列 4、当有消息到达Queue时Broker默认将消息推送给消费者。 5、消费者接收到消息
四.Hello World 1.创建Maven工程
创建生产者工程和消费者工程分别加入RabbitMQ java client的依赖。 test-rabbitmq-producer生产者工程 test-rabbitmq-consumer消费者工程
dependency
groupIdcom.rabbitmq/groupId
artifactIdamqp‐client/artifactId
version4.0.3/version!‐‐此版本与spring boot 1.5.9版本匹配‐‐
/dependency
dependency
groupIdorg.springframework.boot/groupId
artifactIdspring‐boot‐starter‐logging/artifactId
/dependency2.生产者
在生产者工程下的test中创建测试类如下
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer01 {//队列private static final String QUEUE helloworld;public static void main(String[] args) {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(127.0.0.1);connectionFactory.setPort(5672);//端口connectionFactory.setUsername(guest);connectionFactory.setPassword(guest);//设置虚拟机一个mq服务可以设置多个虚拟机每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost(/);Connection connection null;Channel channel null;try {//建立新连接connection connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成channel connection.createChannel();//声明队列如果队列在mq 中没有则要创建//参数String queue, boolean durable, boolean exclusive, boolean autoDelete, MapString, Object arguments/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化如果持久化mq重启后队列还在* 3、exclusive 是否独占连接队列只允许在该连接中访问如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除队列不再使用时是否自动删除此队列如果将此参数和exclusive参数设置为true就可以实现临时队列队列不用了就自动删除* 5、arguments 参数可以设置一个队列的扩展参数比如可设置存活时间*/channel.queueDeclare(QUEUE,true,false,false,null);//发送消息//参数String exchange, String routingKey, BasicProperties props, byte[] body/*** 参数明细* 1、exchange交换机如果不指定将使用mq的默认交换机设置为* 2、routingKey路由key交换机根据路由key来将消息转发到指定的队列如果使用默认交换机routingKey设置为队列的名称* 3、props消息的属性* 4、body消息内容*///消息内容String message hello world 黑马程序员;channel.basicPublish(,QUEUE,null,message.getBytes());System.out.println(send to mq message);} catch (Exception e) {e.printStackTrace();} finally {//关闭连接//先关闭通道try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}
3.消费者
在消费者工程下的test中创建测试类如下
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 入门程序消费者* author Administrator* version 1.0* create 2018-06-17 9:25**/
public class Consumer01 {//队列private static final String QUEUE helloworld;public static void main(String[] args) throws IOException, TimeoutException {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(127.0.0.1);connectionFactory.setPort(5672);//端口connectionFactory.setUsername(guest);connectionFactory.setPassword(guest);//设置虚拟机一个mq服务可以设置多个虚拟机每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost(/);//建立新连接Connection connection connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel connection.createChannel();//监听队列//声明队列如果队列在mq 中没有则要创建//参数String queue, boolean durable, boolean exclusive, boolean autoDelete, MapString, Object arguments/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化如果持久化mq重启后队列还在* 3、exclusive 是否独占连接队列只允许在该连接中访问如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除队列不再使用时是否自动删除此队列如果将此参数和exclusive参数设置为true就可以实现临时队列队列不用了就自动删除* 5、arguments 参数可以设置一个队列的扩展参数比如可设置存活时间*/channel.queueDeclare(QUEUE,true,false,false,null);//实现消费方法DefaultConsumer defaultConsumer new DefaultConsumer(channel){/*** 当接收到消息后此方法将被调用* param consumerTag 消费者标签用来标识消费者的在监听队列时设置channel.basicConsume* param envelope 信封通过envelope* param properties 消息属性* param body 消息内容* throws IOException*/Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange envelope.getExchange();//消息idmq在channel中用来标识消息的id可用于确认消息已接收long deliveryTag envelope.getDeliveryTag();//消息内容String message new String(body,utf-8);System.out.println(receive message:message);}};//监听队列//参数String queue, boolean autoAck, Consumer callback/*** 参数明细* 1、queue 队列名称* 2、autoAck 自动回复当消费者接收到消息后要告诉mq消息已接收如果将此参数设置为tru表示会自动回复mq如果设置为false要通过编程实现回复* 3、callback消费方法当消费者接收到消息要执行的方法*/channel.basicConsume(QUEUE,true,defaultConsumer);}
}
五.总结
1、发送端操作流程 1创建连接connection 2创建通道channel 3声明队列channel.queueDeclare 4发送消息channel.basicPublish
2、接收端 1创建连接 2创建通道 3声明队列 4监听队列 5接收消息 6ack回复