推广 高端网站建设,网站建设大型企业,网页录制视频教程,Wordpress报价主题简单模式
我们以最普通的方式去理解#xff0c;并没有整合Springboot的那种
这是最简单的模式#xff0c;一个生产者#xff0c;一个消费者#xff0c;一个队列 测试
1、 导包#xff0c;没整合#xff0c;不需要编写配置
2、需要生产者消费者
导包
dependency…简单模式
我们以最普通的方式去理解并没有整合Springboot的那种
这是最简单的模式一个生产者一个消费者一个队列 测试
1、 导包没整合不需要编写配置
2、需要生产者消费者
导包
dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.10.0/version
/dependencyProducer
public class Producer {public static void main(String[] args) {//ip port//创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();//创建连接工程connectionFactory.setHost(47.120.50.213);connectionFactory.setPort(5672);connectionFactory.setUsername(admin);connectionFactory.setPassword(admin);connectionFactory.setVirtualHost(/);//创建连接connectionConnection connection null;Channel channel null;try {connection connectionFactory.newConnection(producer);//通过连接获取通道Channelchannel connection.createChannel();//通过创建交换机声明队列绑定关系路由key发送接收消息String queueName queue;/*** 队列的名称* 是否要持久化* 排他性是否独占独立* 是否自动删除在最后一个消费者消费完后* 携带附属参数*/channel.queueDeclare(queueName,false,false,false,null);String message hello world;//发送消息到消息队列channel.basicPublish(,queueName,null,message.getBytes());System.out.println(消息发送成功);} catch (IOException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}finally {//关闭连接if(channel ! null channel.isOpen()){try {channel.close();} catch (IOException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}if(connection ! null connection.isOpen()){try {connection.close();} catch (IOException e) {throw new RuntimeException(e);}}}}
}Consumer
public class Consumer {public static void main(String[] args) {//ip port//创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();//创建连接工程connectionFactory.setHost(47.120.50.213);connectionFactory.setPort(5672);connectionFactory.setUsername(admin);connectionFactory.setPassword(admin);connectionFactory.setVirtualHost(/);//创建连接connectionConnection connection null;Channel channel null;try {connection connectionFactory.newConnection(producer);//通过连接获取通道Channelchannel connection.createChannel();//第一个是消息队列的名字channel.basicConsume(queue, true, new DeliverCallback() {Overridepublic void handle(String s, Delivery message) throws IOException {System.out.println(收到的消息的是new String(message.getBody(),UTF-8));}},new CancelCallback() {Overridepublic void handle(String s) throws IOException {System.out.println(接收消息失败);}});System.out.println(开始接收消息);System.in.read();} catch (IOException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}finally {//关闭连接if(channel ! null channel.isOpen()){try {channel.close();} catch (IOException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}if(connection ! null connection.isOpen()){try {connection.close();} catch (IOException e) {throw new RuntimeException(e);}}}}
}总结
代码流程 上述消息没有设置为持久化 没持久化消息创建了依旧存在除非服务器重启就会删除 持久化服务器重启后都不会删除 发送消息 channel.queueDeclare(queueName,false,false,false,null);
String message hello world;
//发送消息到消息队列
channel.basicPublish(,queueName,null,message.getBytes());接收消息 channel.basicConsume(queue, true, new DeliverCallback() {Overridepublic void handle(String s, Delivery message) throws IOException {System.out.println(收到的消息的是new String(message.getBody(),UTF-8));}
},new CancelCallback() {Overridepublic void handle(String s) throws IOException {System.out.println(接收消息失败);}
}
);问题 1、连接超时 这里可能是NO access 点击admin修改 命令方式给用户分配权限
rabbitmqctl set_permissions -p / admin * .* .* 给用户分配权限发现并没有解决问题
访问的端口时5672因为15672是给web访问的所以需要访问5672需要开通安全组与端口号即567215672都需要开启
#开启端口
[rootiZf8zhsqf64x47n1tpdy6oZ rabbitmq]# firewall-cmd --zonepublic --add-port15672/tcp --permanent
#重启防火墙
firewall-cmd --reload
#需要开启远程安全组思考
为什么基于channel而不是连接
一个应用有多个线程需要从rabbitmq中消费或是生产消息那么必然会建立很多个connection ,也就是多个tcp连接对操作系统而言建立和销毁tcp连接是很昂贵的开销如果遇到使用高峰性能瓶颈也随之显现rabbitmq采用类似nio的做法连接tcp连接复用不仅可以减少性能开销同时也便于管理