新吁网站建设,网站制作学生信息管理,企业网上申报系统,国外网站 国内做镜像2019独角兽企业重金招聘Python工程师标准 一、Routing(路由) (using the Java client)在前面的学习中#xff0c;构建了一个简单的日志记录系统#xff0c;能够广播所有的日志给多个接收者#xff0c;在该部分学习中#xff0c;将添加一个新的特点#xff0… 2019独角兽企业重金招聘Python工程师标准 一、Routing(路由) (using the Java client) 在前面的学习中构建了一个简单的日志记录系统能够广播所有的日志给多个接收者在该部分学习中将添加一个新的特点就是可以只订阅一个特定的消息源也就是说能够直接把关键的错误日志消息发送到日志文件保存起来不重要的日志信息文件不保存在磁盘中但是仍然能够在控制台输出那么这便是我们这部分要学习的消息的路由分发机制。 二、Bindings绑定 在前面的学习中已经创建了绑定(bindings)代码如下 channel.queueBind(queueName, EXCHANGE_NAME, ); 一个绑定就是一个关于exchange和queue的关系它可以简单的被理解为队列是从这个exchange中获取消息的。 绑定可以采取一个额外的routingKey的参数为了避免与basicPublish参数冲突称之为一个绑定Key这是如何创建一个带routingKey的绑定的关键。 channel.queueBind(queueName, EXCHANGE_NAME, black); 一个绑定Key依赖于exchange的类型像之前使用fanout类型的exchange完全忽略了该绑定key的值。 三、Direct exchange直接交换机 前面实现的日志记录系统中广播所有的消息给所有的消费者现在对其进行扩展允许根据信息的严重程度来对消息进行过滤比如希望一个程序写入到磁盘的日志消息只接收错误的消息而不是浪费磁盘保存所有的日志消息。 为了实现这个目标使用一个fanout类型的exchange显然是不能够满足这样的需求的因为它只能广播所有的消息。 为此将使用一个direct exchange来代替fanout exchangedirect exchange使用简单的路由算法将消息通过绑定的Key匹配将要到达的队列。 从上面的结构图中可以看出direct exchange X绑定着两个queue(Q1,Q2)第一个queue绑定的routingKey为orange第二个有两个routingKey被绑定一个routingKey为black另外一个routingKey为green. 说明发送带有routingKey为orange的消息到X(exchange)中X将该消息路由到Q1中发送带有routingKey为black和green的消息都将被路由到Q2中其他所有消息将会被丢弃。 四、Multiple bindings多绑定 多个队列绑定相同的routingKey是允许的在上述实例中可以把X和Q1用routingKey:black绑定起来这种情况下direct exchange将像fanout类型的exchange一样会将消息广播都到所有匹配的queues中即一个routingKey为black的消息将会被发送到Q1和Q2中。 五、Emitting logs发送的日志 使用direct代替fanout类型的exchange发送消息到一个direct exchange中将根据消息的重要程度作为routingKey这样接收程序能够选择它想要接收的日志信息首先必须先创建一个exchange. channel.exchangeDeclare(EXCHANGE_NAME, direct); 其次发送一条信息 channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); 为了简化程序将severity设定为info、warning、error三种类型中的一种。 六、Subscribing订阅消息 接收者根据自己感兴趣的severity来创建一个新到的绑定。 String queueName channel.queueDeclare().getQueue();for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity);
} 七、Putting it all together代码实现 EmitLogDirect.java代码清单如下 [java] view plain copy print ? public class EmitLogDirect { private static final String EXCHANGE_NAME direct_logs; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory new ConnectionFactory(); factory.setHost(localhost); Connection connection factory.newConnection(); Channel channel connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, direct); String severity getSeverity(argv); String message getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println( [x] Sent severity : message ); channel.close(); connection.close(); } //.. } ReceiveLogsDirect代码 清单如下: [java] view plain copy print ? public class ReceiveLogsDirect { private static final String EXCHANGE_NAME direct_logs; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory new ConnectionFactory(); factory.setHost(localhost); Connection connection factory.newConnection(); Channel channel connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, direct); String queueName channel.queueDeclare().getQueue(); if (argv.length 1){ System.err.println(Usage: ReceiveLogsDirect [info] [warning] [error]); System.exit(1); } for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println( [*] Waiting for messages. To exit press CTRLC); QueueingConsumer consumer new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery consumer.nextDelivery(); String message new String(delivery.getBody()); String routingKey delivery.getEnvelope().getRoutingKey(); System.out.println( [x] Received routingKey : message ); } } } 编译和往常一样(参见以往教程用于编译和类路径的建议)。现在,为了方便起见,我们将使用一个环境变量$CP(%CP%在Windows上)的运行时类路径的例子。
如果你只想保存 “警告”和“错误”(而不是“信息”)日志消息到一个文件,打开一个控制台和type [*] Waiting for logs. To exit press CTRLC [x] Sent error:Run. Run. Or it will explode. $ java -cp $CP ReceiveLogsDirect warning error logs_from_rabbit.log$ java -cp $CP ReceiveLogsDirect info warning error$ java -cp $CP EmitLogDirect error Run. Run. Or it will explode. 转载于:https://my.oschina.net/zhanghaiyang/blog/594446