网站建设|,北京城乡与建设厅官方网站查询,湛江搭建做网站在哪里做,网站地链接结构文章目录 AMQP协议的回顾RabbitMQ支持的消息模型第一种模型(直连)开发生产者开发消费者生产者、消费者开发优化API参数细节 第二种模型(work quene)开发生产者开发消费者消息自动确认机制 第三种模型(fanout)开发生产者开发消费者 第四种模型(Routing)开发生产者开发消费者 第五… 文章目录 AMQP协议的回顾RabbitMQ支持的消息模型第一种模型(直连)开发生产者开发消费者生产者、消费者开发优化API参数细节 第二种模型(work quene)开发生产者开发消费者消息自动确认机制 第三种模型(fanout)开发生产者开发消费者 第四种模型(Routing)开发生产者开发消费者 第五种模型(Topic)开发生产者开发消费者 AMQP协议的回顾 在RabbitMQ中有生产者、消费者的概念生产者先与我们的RabbitMQServer建立连接建立完连接之后它会把消息通过连接中通道的形式去传递我们的消息。每一个生产者会对应一个专门的虚拟主机。 在我们做项目的时候RabbitMQ希望我们每一个项目具有单独的虚拟主机这样我们多个应用在操作同一个RabbitMQServer的时候互不影响所以这里的虚拟机有点像关系型数据库中的库概念。 我们在访问虚拟主机的时候是需要权限的如果需要访问到某一个具体的虚拟主机我们需要将虚拟主机与用户进行绑定。 比如RabbitMQ默认为我们提供的guest账户他是可以访问所有的虚拟主机的具有至高无上的权限。在我们实际的生产环境中我们一般是一个项目访问一个虚拟主机或者说是一个业务访问一个虚拟主机在访问的时候我们一般为一个虚拟主机绑定特定的用户。 当我们的生产者通过通道将消息放入到虚拟机之中因为RabbitMQ存在许多的消息模型所以这里不一定会把消息放入到交换机之中。也就是说当生产者将消息传递给交换机或者队列之后他的任务就告一段落了。
这个时候我们的生产者和消费者是完全解耦的我们不需要关心生产者到底有没有运行我只关心消费者监听的队列里面有没有对应的消息即可。 消费者在消费消息的时候也需要去连接到我们RabbitMQServer以及虚拟主机我们才能消费到对应主机中的消息队列里面的数据。 RabbitMQ支持的消息模型 最新的版本有第七种消息模型消息确认模型 第一种模型(直连) 在上图的模型中有以下概念
P生产者也就是要发送消息的程序C消费者消息的接受者会一直等待消息到来。queue消息队列图中红色部分。类似一个邮箱可以缓存消息生产者向其中投递消息消费者从其中取出消息。
首先我们先创建一个新用户/ems然后将一个虚拟主机与其绑定然后给他添加超级用户权限 注意 用户名必须以/开头 开发生产者
public class Provider {//生产消息Testpublic void testSendMessage() throws IOException, TimeoutException {//创建连接mq的连接工厂对象ConnectionFactory connectionFactory new ConnectionFactory();//设置连接rabbitmqserver主机connectionFactory.setHost(10.15.0.9);//设置端口号connectionFactory.setPort(5672);//设置连接那个虚拟主机connectionFactory.setVirtualHost(/ems);//设置访问虚拟主机的用户名和密码connectionFactory.setUsername(ems);connectionFactory.setPassword(123);//获取连接对象Connection connection connectionFactory.newConnection();//获取连接中通道Channel channel connection.createChannel();//通道绑定对应消息队列//参数1: 队列名称 如果队列不存在自动创建//参数2: 用来定义队列特性是否要持久化 true 持久化队列 false 不持久化//参数3: exclusive 是否独占队列 true 独占队列 false 不独占//参数4: autoDelete: 是否在消费完成后自动删除队列 true 自动删除 false 不自动删除//参数5: 额外附加参数channel.queueDeclare(hello,true,false,false,null);//发布消息//参数1: 交换机名称 参数2:队列名称 参数3:传递消息额外设置 参数4:消息的具体内容channel.basicPublish(,hello, null,hello rabbitmq.getBytes());channel.close();connection.close();}
}开发消费者
public class Customer {public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(10.15.0.9);connectionFactory.setPort(5672);connectionFactory.setVirtualHost(/ems);connectionFactory.setUsername(ems);connectionFactory.setPassword(123);//创建连接对象Connection connection connectionFactory.newConnection();*///创建通道Channel channel connection.createChannel();//通道绑定对象channel.queueDeclare(hello,true,false,false,null);//消费消息//参数1: 消费那个队列的消息 队列名称//参数2: 开始消息的自动确认机制//参数3: 消费时的回调接口channel.basicConsume(hello,true,new DefaultConsumer(channel){Override //最后一个参数: 消息队列中取出的消息public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}});}}注意 在使用Junit测试的时候他是不支持多线程模型的。如果我们使用Test去运行的话他没法让我们的消费者去监听(运行完之后直接就杀死了该进程不会处于监听状态)所以这里我们要换成一个main函数。 生产者则不需要注意这一点因为它生产完消息就完事了 我们发现生产者生产完消息之后会关闭通道和链接而在消费这里我们并没有这么做。这是因为可能会导致我们的回调函数还没来得及执行我们的通道就已经关闭。 该模型的特点 点对点的简单消费模型。 适用于登录、注册场景 生产者、消费者开发优化
我们发现我们在开发生产者、消费者的时候前面的连接部分代码重复冗余所以我们可以使用一个工具类对其进行封装
public class RabbitMQUtils {private static ConnectionFactory connectionFactory;private static Properties properties;static{//重量级资源 类加载执行之执行一次connectionFactory new ConnectionFactory();connectionFactory.setHost(10.15.0.5);connectionFactory.setPort(5672);connectionFactory.setVirtualHost(/);connectionFactory.setUsername(guest);connectionFactory.setPassword(guest);}//定义提供连接对象的方法public static Connection getConnection() {try {return connectionFactory.newConnection();} catch (Exception e) {e.printStackTrace();}return null;}//关闭通道和关闭连接工具方法public static void closeConnectionAndChanel(Channel channel, Connection conn) {try {if(channel!null) channel.close();if(conn!null) conn.close();} catch (Exception e) {e.printStackTrace();}}}
我们这里使用静态代码块是因为connectionFactory是重量级资源所以我们决定只在类加载执行时执行一次。 我们这里稍微复习一下java的静态代码块我们会发现在一些项目源码中经常会见到他。 静态代码块语法格式 static{}静态代码块的特点随着类的加载而执行而且只执行一次 执行优先级高于非静态的初始化块它会在类初始化的时候执行一次执行完成便销毁它仅能初始化类变量即static修饰的数据成员。 那么正好我们再来提一下非静态代码块 非静态代码块语法格式 {}执行的时候如果有静态初始化块先执行静态初始化块再执行非静态初始化块在每个对象生成时都会被执行一次它可以初始化类的实例变量。非静态初始化块会在构造函数执行时在构造函数主体代码执行之前被运行。 执行顺序: 静态代码块-----非静态代码块--------构造函数 API参数细节
生产者和消费者均有一个方法queueDeclare就是声明操作的队列
channel.queueDeclare(hello,true,false,false,null);参数1: 队列名称 如果队列不存在自动创建 参数2: 用来定义队列特性是否要持久化 true 持久化队列false 不持久化注意这里说的是队列的持久化也就是如果开启持久化的话我们即使重启rabbitmq服务该队列也会存在因为其内部会把队列从内存写到硬盘中去。当重启完成之后其又会重新将硬盘中的队列读到内存中去 参数3: exclusive 是否独占队列 true 独占队列也就是说队列只能被当前通道所绑定false 不独占 参数4: autoDelete: 是否在消费完成后自动删除队列 true 自动删除false 不自动删除这里的自动删除队列是指消费者不再监听占用队列队列才会消失 参数5: 额外附加参数 注意直连模型下消费者和生产者的queueDeclare中的参数要保持一致这样才能保证操作的是同一个队列 生产者
channel.basicPublish(,hello, MessageProperties.PERSISTENT_TEXT_PLAIN,hello rabbitmq.getBytes());参数1: 交换机名称 (我们这里没有使用交换机所以没有指定)参数2:队列名称参数3:传递消息额外设置 MessageProperties.PERSISTENT_TEXT_PLAIN我们可以通过此参数设置消息在队列中的持久化 参数4:消息的具体内容 这里是以字节的方式进行传输
消费者
channel.basicConsume(hello,true,new DefaultConsumer(channel){Override //最后一个参数: 消息队列中取出的消息public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}
});参数1: 消费哪个队列的消息 队列名称参数2: 开始消息的自动确认机制参数3: 消费时的回调接口 这里我们可以传入一个consumer对象而这个consumer是一个接口它有一个实现类DefaultConsumer
第二种模型(work quene)
Work queues也被称为Task queues任务模型。当消息处理比较耗时的时候可能生产消息的速度会远远大于消息的消费速度。长此以往消息就会堆积越来越多无法及时处理。此时就可以使用work 模型让多个消费者绑定到一个队列共同消费队列中的消息。队列中的消息一旦消费就会消失因此任务是不会被重复执行的。 角色
P生产者任务的发布者C1消费者-1领取任务并且完成任务假设完成速度较慢C2消费者-2领取任务并完成任务假设完成速度快
开发生产者
public class Provider {public static void main(String[] args) throws IOException {//获取连接对象Connection connection RabbitMQUtils.getConnection();//获取通道对象Channel channel connection.createChannel();//通过通道声明队列channel.queueDeclare(work, true, false, false, null);for (int i 1; i 20; i) {//生产消息channel.basicPublish(, work, null, (i hello work quene).getBytes());}//关闭资源RabbitMQUtils.closeConnectionAndChanel(channel, connection);}
}我们这里使用了前面提到的连接工具类 我们运行我们的代码 这里的Unacked代表未被确认的消息
开发消费者
如果我们对两个消费者不做任何处理
消费者-1
public class Customer1 {public static void main(String[] args) throws IOException {//获取连接Connection connection RabbitMQUtils.getConnection();final Channel channel connection.createChannel();channel.queueDeclare(work,true,false,false,null);//参数1:队列名称 参数2:消息自动确认 true 消费者自动向rabbitmq确认消息消费 false 不会自动确认消息channel.basicConsume(work,false,new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(消费者-1: new String(body)); }});}
}
消费者-2
public class Customer2 {public static void main(String[] args) throws IOException {//获取连接Connection connection RabbitMQUtils.getConnection();final Channel channel connection.createChannel();channel.queueDeclare(work,true,false,false,null);//参数1:队列名称 参数2:消息自动确认 true 消费者自动向rabbitmq确认消息消费 false 不会自动确认消息channel.basicConsume(work,false,new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(消费者-2: new String(body)); }});}
}
在这种不做任何处理的情况下消费者1、消费者2消费的消息都是一致的 总结:默认情况下RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。也就是说平均分配 而这样的话我们不难想到一个问题加入我们的消费者1处理的比较慢消费者2处理的比较快。这就导致消费者1的消息会在队列中造成滞留消费者2可能已经处理完闲着了。这样的情况下平均分配显然也会影响效率并且导致消息再队列中的积累。
我们可以模拟一下这个情况我们在消费者1中添加一个线程睡眠 这个时候我们运行发现在消费者2将自己的消息打印完之后消费者1的消息只打印了一条 那么能不能用第二种模型实现一种能者多劳的模式呢
消息自动确认机制 Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We’ll also lose all the messages that were dispatched to this particular worker but were not yet handled. But we don’t want to lose any tasks. If a worker dies, we’d like the task to be delivered to another worker. 完成一项任务可能需要几秒钟。你可能想知道如果其中一个消费者开始了一项长任务但只完成了一部分任务就去世了会发生什么。使用我们当前的代码一旦RabbitMQ将消息传递给消费者它会立即将其标记为删除。在这种情况下如果你杀死一个消费者我们将丢失它正在处理的消息。我们还将丢失发送给该特定工作人员但尚未处理的所有消息。 但我们不想失去任何任务。如果一名消费者死亡我们希望将任务交付给另一名消费者。 自动确认是指当消息一旦被Consumer接收到则自动确认收到并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中很可能消息接收到业务处理出现异常那么该消息就会丢失。如果设置了手动确认方式则需要在业务处理成功后调用channel.basicAck()手动签收如果出现异常则调用channel.basicNack()方法让其自动重新发送消息。 我们使用能者多劳的模式需要进行两步额外的操作 设置通道一次只能消费一个消息 关闭消息的自动确认,开启手动确认消息
Customer1
public class Customer1 {public static void main(String[] args) throws IOException {//获取连接Connection connection RabbitMQUtils.getConnection();final Channel channel connection.createChannel();channel.basicQos(1);//每一次只能消费一个消息channel.queueDeclare(work,true,false,false,null);//参数1:队列名称 参数2:消息自动确认 true 消费者自动向rabbitmq确认消息消费 false 不会自动确认消息channel.basicConsume(work,false,new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try{Thread.sleep(2000);}catch (Exception e){e.printStackTrace();}System.out.println(消费者-1: new String(body));// 参数1:确认队列中那个具体消息 参数2:是否开启多个消息同时确实channel.basicAck(envelope.getDeliveryTag(),false);}});}
}Customer2
public class Customer2 {public static void main(String[] args) throws IOException {//获取连接Connection connection RabbitMQUtils.getConnection();final Channel channel connection.createChannel();channel.basicQos(1);channel.queueDeclare(work,true,false,false,null);channel.basicConsume(work,false,new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(消费者-2: new String(body));//手动确认 参数1:手动确认消息标识 参数2:false 每次确认一个channel.basicAck(envelope.getDeliveryTag(), false);}}); }
}我们对上面两段代码的一些参数或方法做出解释
basicQos
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;参数 prefetchSize消息的大小 prefetchCount会告诉RabbitMQ不要同时给一个消费者推送多于N个消息即一旦有N个消息还没有ack则该consumer将block掉直到有消息ack global是否将上面设置应用于channel简单点说就是上面限制是channel级别的还是consumer级别
channel.basicAck()
void basicAck(long deliveryTag, boolean multiple) throws IOException;参数 deliveryTag该消息的index multiple是否批量处理. true:将一次性ack所有小于deliveryTag的消息
envelope.getDeliveryTag() 这个方法是表示消息的唯一标识ID返回的是一个正整数是rabbitmq来自增设置的
第三种模型(fanout) fanout 扇出 也称为广播 在广播模式下消息发送流程是这样的
可以有多个消费者每个消费者有自己的queue队列 我们这里创建的队列是临时的用完之后就会删除 每个队列都要绑定到Exchange交换机生产者发送的消息只能发送到交换机交换机来决定要发给哪个队列生产者无法决定。交换机把消息发送给绑定过的所有队列队列的消费者都能拿到消息。实现一条消息被多个消费者消费 使用场景 比如我们的商品购物车在我们结算的时候我们可能会跟多个系统进行交互订单系统、库存系统等等。这个时候我们的购物车信息会被多条队列给消费。 在fanout模式下路由的相关配置没有意义相关参数可以空着 开发生产者
public class Provider {public static void main(String[] args) throws IOException {//获取连接对象Connection connection RabbitMQUtils.getConnection();Channel channel connection.createChannel();//将通道声明指定交换机 //参数1: 交换机名称 参数2: 交换机类型 fanout 广播类型channel.exchangeDeclare(logs,fanout);//发送消息channel.basicPublish(logs,,null,fanout type message.getBytes());//释放资源RabbitMQUtils.closeConnectionAndChanel(channel,connection);}
}channel.exchangeDeclare(logs,fanout);:
参数1: 交换机名称参数2: 交换机类型 fanout是广播类型
开发消费者
public class Customer1 {public static void main(String[] args) throws IOException {//获取连接对象Connection connection RabbitMQUtils.getConnection();Channel channel connection.createChannel();//通道绑定交换机channel.exchangeDeclare(logs,fanout);//临时队列String queueName channel.queueDeclare().getQueue();//绑定交换机和队列channel.queueBind(queueName,logs,);//消费消息channel.basicConsume(queueName,true,new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(消费者1: new String(body));}});}
}其他几个消费者同理 我们创建三个消费者把他们开启之后再开启我们的生产者测试结果如下
第四种模型(Routing) 第五种模型其实是第四种模型的一个分支如果我们叫第四种模型为路由的话那么我们可以说第五种模型是动态路由。第四种模型我们也可以叫做direct模型(直连) 在Fanout模式中一条消息会被所有订阅的队列都消费。但是在某些场景下我们希望不同的消息被不同的队列消费。这时就要用direct类型的Exchange。
在Routing模型下
队列与交换机的绑定不能是任意绑定了而是要指定一个RoutingKey路由key消息的发送方在 向 Exchange发送消息时也必须指定消息的 RoutingKey。Exchange不再把消息交给每一个绑定的队列而是根据消息的Routing Key进行判断只有队列的Routingkey与消息的 Routing key完全一致才会接收到消息
流程: 图解
P生产者向Exchange发送消息发送消息时会指定一个routing key。XExchange交换机接收生产者的消息然后把消息递交给 与routing key完全匹配的队列C1消费者其所在队列指定了需要routing key 为 error 的消息C2消费者其所在队列指定了需要routing key 为 info、error、warning 的消息
开发生产者
public class Provider {public static void main(String[] args) throws IOException {//获取连接对象Connection connection RabbitMQUtils.getConnection();//获取连接通道对象Channel channel connection.createChannel();String exchangeName logs_direct;//通过通道声明交换机 参数1:交换机名称 参数2:direct 路由模式channel.exchangeDeclare(exchangeName,direct);//发送消息String routingkey error;channel.basicPublish(exchangeName,routingkey,null,(指定的route keykey的消息).getBytes());//关闭资源RabbitMQUtils.closeConnectionAndChanel(channel,connection);}
}这个时候我们就已经可以看到我们的交换机了
开发消费者
我们这里开发两个消费者
消费者1拿到routekey为error的消息消费者2拿到routekey为info、error、warning的消息
public class Customer1 {public static void main(String[] args) throws IOException {Connection connection RabbitMQUtils.getConnection();Channel channel connection.createChannel();String exchangeName logs_direct;//通道声明交换机以及交换的类型channel.exchangeDeclare(exchangeName,direct);//创建一个临时队列String queue channel.queueDeclare().getQueue();//基于route key绑定队列和交换机channel.queueBind(queue,exchangeName,error);//获取消费的消息channel.basicConsume(queue,true,new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(消费者1: new String(body));}});}
}public class Customer2 {public static void main(String[] args) throws IOException {Connection connection RabbitMQUtils.getConnection();Channel channel connection.createChannel();String exchangeName logs_direct;//声明交换机 以及交换机类型 directchannel.exchangeDeclare(exchangeName,direct);//创建一个临时队列String queue channel.queueDeclare().getQueue();//临时队列和交换机绑定channel.queueBind(queue,exchangeName,info);channel.queueBind(queue,exchangeName,error);channel.queueBind(queue,exchangeName,warning);//消费消息channel.basicConsume(queue,true,new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(消费者2: new String(body));}});}
}
我们测试一下
测试生产者发送Route key为error的消息时 测试生产者发送Route key为info的消息时 第五种模型(Topic)
Topic类型的Exchange与Direct相比都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符这种模型Routingkey 一般都是由一个或多个单词组成多个单词之间以”.”分割例如 item.insert *(star) 匹配不多不少恰好1个词#(hash): 匹配0个或多个词
例如
audit.# 匹配audit.irs.corporate或者 audit.irs 等
audit.* 只能匹配audit.irs开发生产者
public class Provider {public static void main(String[] args) throws IOException {//获取连接对象Connection connection RabbitMQUtils.getConnection();Channel channel connection.createChannel();//声明交换机以及交换机类型 topicchannel.exchangeDeclare(topics,topic);//发布消息String routekey user;channel.basicPublish(topics,routekey,null,(这里是topic动态路由模型,routekey: [routekey]).getBytes());//关闭资源RabbitMQUtils.closeConnectionAndChanel(channel,connection);}
}开发消费者
我们还是开发两个消费者 消费者1Routing Key中使用*通配符方式 public class Customer1 {public static void main(String[] args) throws IOException {//获取连接Connection connection RabbitMQUtils.getConnection();Channel channel connection.createChannel();//声明交换机以及交换机类型channel.exchangeDeclare(topics,topic);//创建一个临时队列String queue channel.queueDeclare().getQueue();//绑定队列和交换机 动态统配符形式route keychannel.queueBind(queue,topics,user.*);//消费消息channel.basicConsume(queue,true,new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(消费者1: new String(body));}});}
} 消费者2中Routing Key中使用#通配符方式 public class Customer2 {public static void main(String[] args) throws IOException {//获取连接Connection connection RabbitMQUtils.getConnection();Channel channel connection.createChannel();//声明交换机以及交换机类型channel.exchangeDeclare(topics,topic);//创建一个临时队列String queue channel.queueDeclare().getQueue();//绑定队列和交换机 动态统配符形式route keychannel.queueBind(queue,topics,user.#);//消费消息channel.basicConsume(queue,true,new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(消费者2: new String(body));}});}
}这个时候我们测试的结果就是
消费者2可以拿到消息消费者1拿不到消息