手机酒店网站建设,自动推送代码wordpress教程,微商城app开发公司,wordpress+模板宽度2.7日学习打卡 JMS 由于MQ产品很多#xff0c;操作方式各有不同#xff0c;于是JAVA提供了一套规则 ——JMS#xff0c;用于操作消息中间件。JMS即Java消息服务 #xff08;JavaMessage Service#xff09;应用程序接口#xff0c;是一个Java平台中关于面 向消息中间件的…2.7日学习打卡 JMS 由于MQ产品很多操作方式各有不同于是JAVA提供了一套规则 ——JMS用于操作消息中间件。JMS即Java消息服务 JavaMessage Service应用程序接口是一个Java平台中关于面 向消息中间件的API。JMS是JavaEE规范中的一种类比JDBC。很多 MQ产品都实现了JMS规范例如ActiveMQ。RabbitMQ官方并没 有实现JMS规范但是开源社区有JMS的实现包。
创建项目
# 开启管控台插件
rabbitmq-plugins enable
rabbitmq_management
# 启动rabbitmq
rabbitmq-server -detached创建普通maven项目添加RabbitMQ依赖
dependenciesdependencygroupIdcom.rabbitmq/groupIdartifactIdamqpclient/artifactIdversion5.14.0/version/dependency
/dependencies一. RabbitMQ 简单模式
P生产者也就是要发送消息的程序
C消费者消息的接收者会一直等待消息到来
queue消息队列图中红色部分。类似一个邮箱可以缓存消息生产者向其中投递消息消费者从其中取出消息
特点
一个生产者对应一个消费者通过队列进行消息传递。该模式使用direct交换机direct交换机是RabbitMQ默认交换机
生产者代码实现
步骤 创建连接工厂ConnectionFactory设置工厂的参数创建连接 Connection创建管道 Channel简单模式中没有交换机exchange所以不用创建RabbitMQ会使用默认的交换机创建队列 queue设置发送内容使用channal.basicPublish()发送释放资源 代码实现
package com.jjy.mq.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;//生产者
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();//使用自己的服务器ip地址connectionFactory.setHost(192.168.66.100);//rabbitmq的默认端口5672connectionFactory.setPort(5672);//用户名connectionFactory.setUsername(jjy);//密码connectionFactory.setPassword(jjy);//虚拟机connectionFactory.setVirtualHost(/);//2.创建连接Connection connection connectionFactory.newConnection();//3.建立信道Channel channel connection.createChannel();//4.创建队列如果队列已存在则使用该队列/**// * 参数1队列名// * 参数2是否持久化true表示MQ重启后队列还在。// * 参数3是否私有化false表示所有消费者都可以访问true表示只有第一次拥有它的消费者才能访问// * 参数4是否自动删除true表示不再使用队列时自动删除队列// * 参数5其他额外参数// */channel.queueDeclare(simple_queue,false,false,false,null);//5.发送消息String mesghello rabbitmq;/*** 参数1交换机名表示默认交换机* 参数2路由键简单模式就是队列名* 参数3其他额外参数* 参数4要传递的消息字节数组*/channel.basicPublish(,simple_queue,null,mesg.getBytes());//6.关闭资源(信道和连接)channel.close();connection.close();System.out.println(发送成功);}
}
消费者代码实现 步骤 1.创建连接工厂ConnectionFactory 2.设置工厂参数 3.创建连接 4.创建信道 前四步代码基本是一致的需要注意的是生产者与消费者的Channel是不同Connection中的不是同一个对象. 5. 最简单的模型没有交换机exchange所以此处RabbitMQ会使用默认的交换机 6. 接收消息有一个回调方法 channel.basicConsume() 代码实现
package com.jjy.mq.simple;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Customer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(192.168.66.100);connectionFactory.setPort(5672);connectionFactory.setUsername(jjy);connectionFactory.setPassword(jjy);connectionFactory.setVirtualHost(/);//2.创建连接Connection connection connectionFactory.newConnection();//3.建立信道Channel channel connection.createChannel();//4.监听队列/*** 参数1监听的队列名* 参数2是否自动签收如果设置为false则需要手动确认消息已收到否则MQ会一直发送消息* 参数3Consumer的实现类重写该类方法表示接受到消息后如何消费*/channel.basicConsume(simple_queue,true,new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message new String(body,UTF-8);System.out.println(接受消息消息为message);}});//}
}
二. RabbitMQ 工作队列模式 与简单模式相比工作队列模式(Work Queue)多了一些消费者该 模式也使用direct交换机应用于处理消息较多的情况。特点如 下
一个队列对应多个消费者。一条消息只会被一个消费者消费。消息队列默认采用轮询的方式将消息平均发送给消费者
应用场景对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度
生产者代码实现
代码实现
package com.jjy.mq.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(192.168.66.100);connectionFactory.setPort(5672);connectionFactory.setUsername(jjy);connectionFactory.setPassword(jjy);connectionFactory.setVirtualHost(/);// 2.创建连接Connection connection connectionFactory.newConnection();// 3.建立信道Channel channel connection.createChannel();// 4.创建队列持久化队列channel.queueDeclare(work_queue,true,false,false,null);// 5.发送大量消息参数3表示该消息为持久化消息即除了保存到内存还会保存到磁盘中for(int i0;i100;i){channel.basicPublish(,work_queue, MessageProperties.PERSISTENT_TEXT_PLAIN, (你好这是今天的第i条消息).getBytes());}// 6.关闭资源channel.close();connection.close();}
}
消费者代码实现 消费者1
package com.jjy.mq.work;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Customer1 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(192.168.66.100);connectionFactory.setPort(5672);connectionFactory.setUsername(jjy);connectionFactory.setPassword(jjy);connectionFactory.setVirtualHost(/);//2.创建连接Connection connection connectionFactory.newConnection();//3.建立信道Channel channel connection.createChannel();// 4.监听队列处理消息channel.basicConsume(work_queue,true,new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message new String(body, UTF-8);System.out.println(消费者1消费消息消息为 message);}});}
}
消费者2
package com.jjy.mq.work;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Customer2 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(192.168.66.100);connectionFactory.setPort(5672);connectionFactory.setUsername(jjy);connectionFactory.setPassword(jjy);connectionFactory.setVirtualHost(/);//2.创建连接Connection connection connectionFactory.newConnection();//3.建立信道Channel channel connection.createChannel();// 4.监听队列处理消息channel.basicConsume(work_queue,true,new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message new String(body, UTF-8);System.out.println(消费者2消费消息消息为 message);}});}}
消费者3
package com.jjy.mq.work;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Customer3 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(192.168.66.100);connectionFactory.setPort(5672);connectionFactory.setUsername(jjy);connectionFactory.setPassword(jjy);connectionFactory.setVirtualHost(/);//2.创建连接Connection connection connectionFactory.newConnection();//3.建立信道Channel channel connection.createChannel();// 4.监听队列处理消息channel.basicConsume(work_queue,true,new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message new String(body, UTF-8);System.out.println(消费者3消费消息消息为 message);}});}}
三. RabbitMQ 发布订阅模式 P生产者也就是要发送消息的程序但是不再发送到队列中而是发给X交换机
C消费者消息的接收者会一直等待消息到来
Queue消息队列接收消息、缓存消息
在开发过程中有一些消息需要不同消费者进行不同的处理如电 商网站的同一条促销信息需要短信发送、邮件发送、站内信发送 等。此时可以使用发布订阅模式(Publish/Subscribe) 特点
生产者将消息发送给交换机交换机将消息转发到绑定此交换机的每个队列中。工作队列模式的交换机只能将消息发送给一个队列发布订阅模式的交换机能将消息发送给多个队列。发布订阅模式使用fanout交换机。
Exchange交换机X一方面接收生产者发送的消息。另一方面知道如何处理消息例如递交给某个特别队列、 递交给所有队列、或是将消息丢弃。到底如何操作取决于Exchange的类型。Exchange有常见以下3种类型 ➢ Fanout广播将消息交给所有绑定到交换机的队列 ➢ Direct定向把消息交给符合指定routing key 的队列 ➢ Topic常用通配符把消息交给符合routing pattern路由模式的队列 Exchange交换机只负责转发消息不具备存储消息的能力因此如果没有任何队列与 Exchange 绑定或者没有符合路由规则的队列那么消息会丢失
生产者代码实现
与之前的步骤相比多了创建交换机和绑定交换机与队列的操作
代码实现
package com.jjy.mq.publish;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class produce {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(192.168.66.100);connectionFactory.setPort(5672);connectionFactory.setUsername(jjy);connectionFactory.setPassword(jjy);connectionFactory.setVirtualHost(/);//2.创建连接Connection connection connectionFactory.newConnection();//3.建立信道Channel channel connection.createChannel();//4.创建交换机/*exchangeDeclare(String exchange, -- 交换机的名称String type, -- 交换机的类型4种枚举(direct,fanout,topic,headers)boolean durable, -- 持久化boolean autoDelete, -- 自动删除boolean internal, -- 内部使用基本是falseMapString, Object arguments) -- 参数*/*** 参数1交换机名* 参数2交换机类型* 参数3交换机持久化*/channel.exchangeDeclare(exchange_fanout, BuiltinExchangeType.FANOUT,true);//5.创建队列//短信队列channel.queueDeclare(SEND_MAIL,true,false,false,null);//消息队列channel.queueDeclare(SEND_MESSAGE,true,false,false,null);//站内信息channel.queueDeclare(SEND_STATION,true,false,false,null);//6.交换机绑定队列/*** 参数1队列名* 参数2交换机名* 参数3路由关键字发布订阅模式写即可*/channel.queueBind(SEND_MAIL,exchange_fanout,);channel.queueBind(SEND_MESSAGE,exchange_fanout,);channel.queueBind(SEND_STATION,exchange_fanout,);//7.发送消息for (int i 1; i 10 ; i) {channel.basicPublish(exchange_fanout,,null,(你好尊敬的用户秒杀商品开抢了i).getBytes(StandardCharsets.UTF_8));}//8.关闭资源channel.close();connection.close();}
}
消费者代码实现
接下来编写三个消费者分别监听各自的队列。 //站内信消费者
package com.jjy.mq.publish;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 站内信消费者
public class CustomerStation {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(192.168.66.100);connectionFactory.setPort(5672);connectionFactory.setUsername(jjy);connectionFactory.setPassword(jjy);connectionFactory.setVirtualHost(/);// 默认虚拟机//2.创建连接Connection conn connectionFactory.newConnection();//3.建立信道Channel channel conn.createChannel();// 4.监听队列channel.basicConsume(SEND_STATION, true, new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message new String(body, utf-8);System.out.println(发送站内信message);}});}
}邮件消费者 package com.jjy.mq.publish;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMail {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(192.168.66.100);connectionFactory.setPort(5672);connectionFactory.setUsername(jjy);connectionFactory.setPassword(jjy);connectionFactory.setVirtualHost(/);// 默认虚拟机//2.创建连接Connection conn connectionFactory.newConnection();//3.建立信道Channel channel conn.createChannel();// 4.监听队列channel.basicConsume(SEND_MAIL, true, new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message new String(body, utf-8);System.out.println(发送邮件message);}});}
}短信消费者
package com.jjy.mq.publish;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMessage {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(192.168.66.100);connectionFactory.setPort(5672);connectionFactory.setUsername(jjy);connectionFactory.setPassword(jjy);connectionFactory.setVirtualHost(/);// 默认虚拟机//2.创建连接Connection conn connectionFactory.newConnection();//3.建立信道Channel channel conn.createChannel();// 4.监听队列channel.basicConsume(SEND_MESSAGE, true, new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message new String(body, utf-8);System.out.println(发送短信message);}});}
}
也可以使用工作队列发布订阅模式同时使用两个消费者同时监听 一个队列 // 短信消费者2
public class CustomerMessage2 {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(192.168.0.162);connectionFactory.setPort(5672);connectionFactory.setUsername(itbaizhan);connectionFactory.setPassword(itbaizhan);connectionFactory.setVirtualHost(/);// 默认虚拟机//2.创建连接Connection conn connectionFactory.newConnection();//3.建立信道Channel channel conn.createChannel();// 4.监听队列channel.basicConsume(SEND_MESSAGE, true, new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message new String(body, utf-8);System.out.println(发送短信2message);}});}
}两个不一样的系统对同一条消息做不一样的处理
发布订阅模式与工作队列模式的区别 1工作队列模式不用定义交换机而发布/订阅模式需要定义交换机
2发布/订阅模式的生产方是面向交换机发送消息工作队列模式的生产方是面向队列发送消息(底层使用 默认交换机)
3发布/订阅模式需要设置队列和交换机的绑定工作队列模式不需要设置实际上工作队列模式会将队列绑定到默认的交换机
四. RabbitMQ 路由模式 使用发布订阅模式时所有消息都会发送到绑定的队列中但很多 时候不是所有消息都无差别的发布到所有队列中。比如电商网站 的促销活动双十一大促可能会发布到所有队列而一些小的促销 活动为了节约成本只发布到站内信队列。此时需要使用路由模式 (Routing)完成这一需求。 特点
每个队列绑定路由关键字RoutingKey生产者将带有RoutingKey的消息发送给交换机交换机根据RoutingKey转发到指定队列。路由模 式使用direct交换机。
队列与交换机的绑定不能是任意绑定了而是要指定一个 RoutingKey路由key)
消息的发送方在向 Exchange 发送消息时也必须指定消息的 RoutingKey
Exchange 不再把消息交给每一个绑定的队列而是根据消息的 Routing Key 进行判断只有队列的 Routingkey 与消息的 Routing key 完全一致才会接收到消息
生产者代码实现
package com.jjy.mq.routing;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class produce {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(192.168.66.100);connectionFactory.setPort(5672);connectionFactory.setUsername(jjy);connectionFactory.setPassword(jjy);connectionFactory.setVirtualHost(/);//2.创建连接Connection connection connectionFactory.newConnection();//3.建立信道Channel channel connection.createChannel();//4.创建交换机/*** 参数1交换机名* 参数2交换机类型* 参数3交换机持久化*/channel.exchangeDeclare(exchange_routing, BuiltinExchangeType.DIRECT,true);// 5.创建队列channel.queueDeclare(SEND_MAIL2,true,false,false,null);channel.queueDeclare(SEND_MESSAGE2,true,false,false,null);channel.queueDeclare(SEND_STATION2,true,false,false,null);//6.交换机绑定队列/*** 参数1队列名* 参数2交换机名* 参数3路由关键字发布订阅模式写即可*/channel.queueBind(SEND_MAIL2,exchange_routing,import);channel.queueBind(SEND_MESSAGE2,exchange_routing,import);channel.queueBind(SEND_STATION2,exchange_routing,import);channel.queueBind(SEND_STATION2,exchange_routing,normal);//7.发送消息channel.basicPublish(exchange_routing,import,null,双十一大促活动.getBytes());channel.basicPublish(exchange_routing,normal,null,小型促销活动.getBytes());//8.关闭资源channel.close();connection.close();}
}
消费者代码实现
站内信消费者
package com.jjy.mq.routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 站内信消费者
public class CustomerStation {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(192.168.66.100);connectionFactory.setPort(5672);connectionFactory.setUsername(jjy);connectionFactory.setPassword(jjy);connectionFactory.setVirtualHost(/);// 默认虚拟机//2.创建连接Connection conn connectionFactory.newConnection();//3.建立信道Channel channel conn.createChannel();// 4.监听队列channel.basicConsume(SEND_STATION2, true, new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message new String(body, utf-8);System.out.println(发送站内信message);}});}
}短信消费者
package com.jjy.mq.routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMessage {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(192.168.66.100);connectionFactory.setPort(5672);connectionFactory.setUsername(jjy);connectionFactory.setPassword(jjy);connectionFactory.setVirtualHost(/);// 默认虚拟机//2.创建连接Connection conn connectionFactory.newConnection();//3.建立信道Channel channel conn.createChannel();// 4.监听队列channel.basicConsume(SEND_MESSAGE2, true, new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message new String(body, utf-8);System.out.println(发送短信message);}});}
}
邮件消费者
package com.jjy.mq.routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMail {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(192.168.66.100);connectionFactory.setPort(5672);connectionFactory.setUsername(jjy);connectionFactory.setPassword(jjy);connectionFactory.setVirtualHost(/);// 默认虚拟机//2.创建连接Connection conn connectionFactory.newConnection();//3.建立信道Channel channel conn.createChannel();// 4.监听队列channel.basicConsume(SEND_MAIL2, true, new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message new String(body, utf-8);System.out.println(发送邮件message);}});}
}
总的来说就一句话
Routing 模式要求队列在绑定交换机时要指定 routing key消息会转发到符合 routing key 的队列。
五. RabbitMQ 通配符模式 通配符模式(Topic)是在路由模式的基础上给队列绑定带通配符的 路由关键字只要消息的RoutingKey能实现通配符匹配就会将消 息转发到该队列。通配符模式比路由模式更灵活使用topic交换 机. 通配符规则
消息设置RoutingKey时RoutingKey由多个单词构成中间以 . 分割。队列设置RoutingKey时 # 可以匹配任意多个单词 * 可以匹配任意一个单词。
生产者代码实现 代码实现
package com.jjy.mq.topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class produce {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(192.168.66.100);connectionFactory.setPort(5672);connectionFactory.setUsername(jjy);connectionFactory.setPassword(jjy);connectionFactory.setVirtualHost(/);//2.创建连接Connection connection connectionFactory.newConnection();//3.建立信道Channel channel connection.createChannel();//4.创建交换机/*** 参数1交换机名* 参数2交换机类型* 参数3交换机持久化*/channel.exchangeDeclare(exchange_topic, BuiltinExchangeType.TOPIC,true);// 5.创建队列channel.queueDeclare(SEND_MAIL3,true,false,false,null);channel.queueDeclare(SEND_MESSAGE3,true,false,false,null);channel.queueDeclare(SEND_STATION3,true,false,false,null);//6.交换机绑定队列channel.queueBind(SEND_MAIL3,exchange_topic,#.mail.#);channel.queueBind(SEND_MESSAGE3,exchange_topic,#.message.#);channel.queueBind(SEND_STATION3,exchange_topic,#.station.#);//7.发送消息channel.basicPublish(exchange_topic,mail.message.station,null,双十一大促活动.getBytes());channel.basicPublish(exchange_topic,station,null,小型促销活动.getBytes());//8.关闭资源channel.close();connection.close();}
}
消费者代码实现
站内信消费者
package com.jjy.mq.topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 站内信消费者
public class CustomerStation {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(192.168.66.100);connectionFactory.setPort(5672);connectionFactory.setUsername(jjy);connectionFactory.setPassword(jjy);connectionFactory.setVirtualHost(/);// 默认虚拟机//2.创建连接Connection conn connectionFactory.newConnection();//3.建立信道Channel channel conn.createChannel();// 4.监听队列channel.basicConsume(SEND_STATION3, true, new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message new String(body, utf-8);System.out.println(发送站内信message);}});}
}短信消费者
package com.jjy.mq.topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMessage {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(192.168.66.100);connectionFactory.setPort(5672);connectionFactory.setUsername(jjy);connectionFactory.setPassword(jjy);connectionFactory.setVirtualHost(/);// 默认虚拟机//2.创建连接Connection conn connectionFactory.newConnection();//3.建立信道Channel channel conn.createChannel();// 4.监听队列channel.basicConsume(SEND_MESSAGE3, true, new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message new String(body, utf-8);System.out.println(发送短信message);}});}
}邮件消费者
package com.jjy.mq.topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMail {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(192.168.66.100);connectionFactory.setPort(5672);connectionFactory.setUsername(jjy);connectionFactory.setPassword(jjy);connectionFactory.setVirtualHost(/);// 默认虚拟机//2.创建连接Connection conn connectionFactory.newConnection();//3.建立信道Channel channel conn.createChannel();// 4.监听队列channel.basicConsume(SEND_MAIL3, true, new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message new String(body, utf-8);System.out.println(发送邮件message);}});}
}
总述topics模式比routing模式要更加灵活笼统的说就是routing模式加上通配符
如果我的内容对你有帮助请点赞评论收藏。创作不易大家的支持就是我坚持下去的动力