阿里云智能建站,千锋教育和黑马哪个好,网页布局的设计原则,网站界面设计和ios移动界面设计的区别官方文档参考#xff1a;https://www.rabbitmq.com/tutorials/tutorial-four-python.html
使用direct类型的Exchange,发N条消息并使用不同的routingKey,消费者定义队列并将队列routingKey、Exchange绑定。此时使用direct模式Exchange必须要routingKey完成匹配的情况下消息才…官方文档参考https://www.rabbitmq.com/tutorials/tutorial-four-python.html
使用direct类型的Exchange,发N条消息并使用不同的routingKey,消费者定义队列并将队列routingKey、Exchange绑定。此时使用direct模式Exchange必须要routingKey完成匹配的情况下消息才会转发到对应的队列中被消费。
样例使用日志分发为样例。即按日志不同的级别分发到不同的队列。每个队列只处理自己的对应的级别日志。
创建生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;public class Product {private static final String[] LOG_LEVEL {ERROR, WARN, INFO};public static void main(String[] args) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setUri(amqp://root:123456node1:5672/%2f);Connection connection factory.newConnection();Channel channel connection.createChannel();// 声明交换机交换器和消息队列的绑定不需要在这里处理。channel.exchangeDeclare(ex.routing,BuiltinExchangeType.DIRECT,// 持久的标识false,// 自动删除的标识false,// 属性null);for (int i 0; i 30; i) {String level LOG_LEVEL[ThreadLocalRandom.current().nextInt(0, LOG_LEVEL.length)];String dataMsg [ level ] 消息发送 : i;// 发送消息channel.basicPublish(ex.routing, level, null, dataMsg.getBytes(StandardCharsets.UTF_8));}}
}创建ERROR的消费者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;public class ErrorConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setUri(amqp://root:123456node1:5672/%2f);Connection connection factory.newConnection();Channel channel connection.createChannel();// 声明队列并绑定channel.exchangeDeclare(ex.routing,BuiltinExchangeType.DIRECT,// 持久的标识false,// 自动删除的标识false,// 属性null);// 此也可以声明为临时队列但是如果消息很重要不要声明临时队列。channel.queueDeclare(log.error,// 永久false,// 排他false,// 自动删除true,// 属性null);//消费者享有绑定到交换器的权力。channel.queueBind(log.error, ex.routing, ERROR);// 通过chanel消费消息channel.basicConsume(log.error,(consumerTag, message) - {System.out.println(ERROR收到的消息: new String(message.getBody(), StandardCharsets.UTF_8));},consumerTag - {});}
}创建INFO级的消费者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;public class InfoConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setUri(amqp://root:123456node1:5672/%2f);Connection connection factory.newConnection();Channel channel connection.createChannel();// 声明队列并绑定channel.exchangeDeclare(ex.routing,BuiltinExchangeType.DIRECT,// 持久的标识false,// 自动删除的标识true,// 属性null);// 此也可以声明为临时队列但是如果消息很重要不要声明临时队列。channel.queueDeclare(log.info,// 永久false,// 排他false,// 自动删除false,// 属性null);//消费者享有绑定到交换器的权力。channel.queueBind(log.info, ex.routing, INFO);// 通过chanel消费消息channel.basicConsume(log.info,(consumerTag, message) - {System.out.println(INFO收到的消息: new String(message.getBody(), StandardCharsets.UTF_8));},consumerTag - {});}
}创建WARN级别的消息者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;public class WarnConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setUri(amqp://root:123456node1:5672/%2f);Connection connection factory.newConnection();Channel channel connection.createChannel();// 声明队列并绑定channel.exchangeDeclare(ex.routing,BuiltinExchangeType.DIRECT,// 持久的标识false,// 自动删除的标识false,// 属性null);// 此也可以声明为临时队列但是如果消息很重要不要声明临时队列。channel.queueDeclare(log.warn,// 永久false,// 排他false,// 自动删除true,// 属性null);//消费者享有绑定到交换器的权力。channel.queueBind(log.warn, ex.routing, WARN);// 通过chanel消费消息channel.basicConsume(log.warn,(consumerTag, message) - {System.out.println(warn收到的消息: new String(message.getBody(), StandardCharsets.UTF_8));},consumerTag - {});}
}首先启动三个消费者
查看队列及交换机情况
[rootnullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name │ type │
├────────────────────┼─────────┤
│ amq.fanout │ fanout │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic │
├────────────────────┼─────────┤
│ amq.headers │ headers │
├────────────────────┼─────────┤
│ amq.topic │ topic │
├────────────────────┼─────────┤
│ amq.direct │ direct │
├────────────────────┼─────────┤
│ │ direct │
├────────────────────┼─────────┤
│ ex.routing │ direct │
├────────────────────┼─────────┤
│ amq.match │ headers │
└────────────────────┴─────────┘
[rootnullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌─────────────┬─────────────┬──────────────────┬──────────────────┬─────────────┬───────────┐
│ source_name │ source_kind │ destination_name │ destination_kind │ routing_key │ arguments │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ │ exchange │ log.info │ queue │ log.info │ │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ │ exchange │ log.warn │ queue │ log.warn │ │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ │ exchange │ log.error │ queue │ log.error │ │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ ex.routing │ exchange │ log.error │ queue │ ERROR │ │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ ex.routing │ exchange │ log.info │ queue │ INFO │ │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ ex.routing │ exchange │ log.warn │ queue │ WARN │ │
└─────────────┴─────────────┴──────────────────┴──────────────────┴─────────────┴───────────┘
[rootnullnull-os ~]# 可以发现交换器ex.routing 绑定了三个队列log.error、log.info 、log.warn并指定了路由键。
启动消费者查看消息通否被正常消费。
ERROR的消费者控制台输出
ERROR收到的消息:[ERROR] 消息发送 :1
ERROR收到的消息:[ERROR] 消息发送 :2
ERROR收到的消息:[ERROR] 消息发送 :6
ERROR收到的消息:[ERROR] 消息发送 :8
ERROR收到的消息:[ERROR] 消息发送 :9
ERROR收到的消息:[ERROR] 消息发送 :11
ERROR收到的消息:[ERROR] 消息发送 :15
ERROR收到的消息:[ERROR] 消息发送 :16
ERROR收到的消息:[ERROR] 消息发送 :19
ERROR收到的消息:[ERROR] 消息发送 :20
ERROR收到的消息:[ERROR] 消息发送 :21
ERROR收到的消息:[ERROR] 消息发送 :23
ERROR收到的消息:[ERROR] 消息发送 :24
ERROR收到的消息:[ERROR] 消息发送 :27
ERROR收到的消息:[ERROR] 消息发送 :28INFO的消费者控制台输出
INFO收到的消息:[INFO] 消息发送 :0
INFO收到的消息:[INFO] 消息发送 :3
INFO收到的消息:[INFO] 消息发送 :4
INFO收到的消息:[INFO] 消息发送 :13
INFO收到的消息:[INFO] 消息发送 :14
INFO收到的消息:[INFO] 消息发送 :22
INFO收到的消息:[INFO] 消息发送 :25WARN的消费都控制台输出:
warn收到的消息:[WARN] 消息发送 :5
warn收到的消息:[WARN] 消息发送 :7
warn收到的消息:[WARN] 消息发送 :10
warn收到的消息:[WARN] 消息发送 :12
warn收到的消息:[WARN] 消息发送 :17
warn收到的消息:[WARN] 消息发送 :18
warn收到的消息:[WARN] 消息发送 :26
warn收到的消息:[WARN] 消息发送 :29至此验证已经完成。