当前位置: 首页 > news >正文

成都微信网站建设报价单网站运营与管理试卷

成都微信网站建设报价单,网站运营与管理试卷,动漫是如何制作出来的,类似 wordpress 建站生产端 Confirm 消息确认机制消息的确认#xff0c;是指生产者投递消息后#xff0c;如果 Broker 收到消息#xff0c;则会给我们生产者一个应答。生产者进行接收应答#xff0c;用来确定这条消息是否正常的发送到 Broker #xff0c;这种方式也是消息的可靠性投递的核心保…生产端 Confirm 消息确认机制消息的确认是指生产者投递消息后如果 Broker 收到消息则会给我们生产者一个应答。生产者进行接收应答用来确定这条消息是否正常的发送到 Broker 这种方式也是消息的可靠性投递的核心保障!Confirm 确认机制流程图如何实现Confirm确认消息?第一步:在 channel 上开启确认模式: channel.confirmSelect()第二步:在 channel 上添加监听: channel.addConfirmListener(ConfirmListener listener);, 监听成功和失败的返回结果根据具体的结果对消息进行重新发送、或记录日志等后续处理!import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConfirmListener;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;public class ConfirmProducer {public static void main(String[] args) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setVirtualHost(/);factory.setUsername(guest);factory.setPassword(guest);Connection connection factory.newConnection();Channel channel connection.createChannel();String exchangeName test_confirm_exchange;String routingKey item.update;//指定消息的投递模式confirm 确认模式channel.confirmSelect();//发送final long start System.currentTimeMillis();for (int i 0; i 5 ; i) {String msg this is confirm msg ;channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());System.out.println(Send message : msg);}//添加一个确认监听 这里就不关闭连接了为了能保证能收到监听消息channel.addConfirmListener(new ConfirmListener() {/*** 返回成功的回调函数*/public void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println(succuss ack);System.out.println(multiple);System.out.println(耗时 (System.currentTimeMillis() - start) ms);}/*** 返回失败的回调函数*/public void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.printf(defeat ack);System.out.println(耗时 (System.currentTimeMillis() - start) ms);}});}}import com.rabbitmq.client.*;import java.io.IOException;public class ConfirmConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setVirtualHost(/);factory.setUsername(guest);factory.setPassword(guest);factory.setAutomaticRecoveryEnabled(true);factory.setNetworkRecoveryInterval(3000);Connection connection factory.newConnection();Channel channel connection.createChannel();String exchangeName test_confirm_exchange;String queueName test_confirm_queue;String routingKey item.#;channel.exchangeDeclare(exchangeName, topic, true, false, null);channel.queueDeclare(queueName, false, false, false, null);//一般不用代码绑定在管理界面手动绑定channel.queueBind(queueName, exchangeName, routingKey);//创建消费者并接收消息Consumer consumer new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {String message new String(body, UTF-8);System.out.println( [x] Received message );}};//设置 Channel 消费者绑定队列channel.basicConsume(queueName, true, consumer);}}我们此处只关注生产端输出消息Send message : this is confirm msgSend message : this is confirm msgSend message : this is confirm msgSend message : this is confirm msgSend message : this is confirm msgsuccuss acktrue耗时3mssuccuss acktrue耗时4ms注意事项我们采用的是异步 confirm 模式提供一个回调方法服务端 confirm 了一条或者多条消息后 Client 端会回调这个方法。除此之外还有单条同步 confirm 模式、批量同步 confirm 模式由于现实场景中很少使用我们在此不做介绍如有兴趣直接参考官方文档。我们运行生产端会发现每次运行结果都不一样,会有多种情况出现因为 Broker 会进行优化有时会批量一次性 confirm 有时会分开几条 confirm。succuss acktrue耗时3mssuccuss ackfalse耗时4ms或者succuss acktrue耗时3msReturn 消息机制Return Listener 用于处理一-些不可路 由的消息!消息生产者通过指定一个 Exchange 和 Routingkey把消息送达到某一个队列中去然后我们的消费者监听队列进行消费处理操作!但是在某些情况下如果我们在发送消息的时候当前的 exchange 不存在或者指定的路由 key 路由不到这个时候如果我们需要监听这种不可达的消息就要使用 Return Listener !在基础API中有一个关键的配置项:Mandatory如果为 true则监听器会接收到路由不可达的消息然后进行后续处理如果为 false那么 broker 端自动删除该消息!Return 消息机制流程图Return 消息示例首先我们需要发送三条消息并且故意将第 0 条消息的 routing Key设置为错误的让他无法正常路由到消费端。mandatory 设置为 true 路由不可达的消息会被监听到不会被自动删除.即channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());最后添加监听即可监听到不可路由到消费端的消息channel.addReturnListener(ReturnListener r))import com.rabbitmq.client.*;import java.io.IOException;public class ReturnListeningProducer {public static void main(String[] args) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setVirtualHost(/);factory.setUsername(guest);factory.setPassword(guest);Connection connection factory.newConnection();Channel channel connection.createChannel();String exchangeName test_return_exchange;String routingKey item.update;String errRoutingKey error.update;//指定消息的投递模式confirm 确认模式channel.confirmSelect();//发送for (int i 0; i 3 ; i) {String msg this is return——listening msg ;//param mandatory 设置为 true 路由不可达的消息会被监听到不会被自动删除if (i 0) {channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());} else {channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());}System.out.println(Send message : msg);}//添加一个确认监听 这里就不关闭连接了为了能保证能收到监听消息channel.addConfirmListener(new ConfirmListener() {/*** 返回成功的回调函数*/public void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println(succuss ack);}/*** 返回失败的回调函数*/public void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.printf(defeat ack);}});//添加一个 return 监听channel.addReturnListener(new ReturnListener() {public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(return relyCode: replyCode);System.out.println(return replyText: replyText);System.out.println(return exchange: exchange);System.out.println(return routingKey: routingKey);System.out.println(return properties: properties);System.out.println(return body: new String(body));}});}}import com.rabbitmq.client.*;import java.io.IOException;public class ReturnListeningConsumer {public static void main(String[] args) throws Exception {//1. 创建一个 ConnectionFactory 并进行设置ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setVirtualHost(/);factory.setUsername(guest);factory.setPassword(guest);factory.setAutomaticRecoveryEnabled(true);factory.setNetworkRecoveryInterval(3000);//2. 通过连接工厂来创建连接Connection connection factory.newConnection();//3. 通过 Connection 来创建 ChannelChannel channel connection.createChannel();//4. 声明String exchangeName test_return_exchange;String queueName test_return_queue;String routingKey item.#;channel.exchangeDeclare(exchangeName, topic, true, false, null);channel.queueDeclare(queueName, false, false, false, null);//一般不用代码绑定在管理界面手动绑定channel.queueBind(queueName, exchangeName, routingKey);//5. 创建消费者并接收消息Consumer consumer new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {String message new String(body, UTF-8);System.out.println( [x] Received message );}};//6. 设置 Channel 消费者绑定队列channel.basicConsume(queueName, true, consumer);}}我们只关注生产端结果消费端只收到两条消息。Send message : this is return——listening msgSend message : this is return——listening msgSend message : this is return——listening msgreturn relyCode: 312return replyText: NO_ROUTEreturn exchange: test_return_exchangereturn routingKey: error.updatereturn properties: #contentHeader(content-typenull, content-encodingnull, headersnull, delivery-modenull, prioritynull, correlation-idnull, reply-tonull, expirationnull, message-idnull, timestampnull, typenull, user-idnull, app-idnull, cluster-idnull)return body: this is return——listening msgsuccuss acksuccuss acksuccuss ack消费端 Ack 和 Nack 机制消费端进行消费的时候如果由于业务异常我们可以进行日志的记录然后进行补偿!如果由于服务器宕机等严重问题那我们就需要手工进行ACK保障消费端消费成功!消费端重回队列是为了对没有处理成功的消息把消息重新会递给Broker!一般我们在实际应用中都会关闭重回队列也就是设置为False。参考 apivoid basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;void basicAck(long deliveryTag, boolean multiple) throws IOException;如何设置手动 Ack 、Nack 以及重回队列首先我们发送五条消息将每条消息对应的循环下标 i 放入消息的 properties 中作为标记以便于我们在后面的回调方法中识别。其次 我们将消费端的 ·channel.basicConsume(queueName, false, consumer); 中的 autoAck属性设置为 false如果设置为true的话 将会正常输出五条消息。我们通过 Thread.sleep(2000)来延时一秒用以看清结果。我们获取到properties中的num之后通过channel.basicNack(envelope.getDeliveryTag(), false, true);将 num为0的消息设置为 nack即消费失败并且将 requeue属性设置为true即消费失败的消息重回队列末端。import com.rabbitmq.client.*;import java.util.HashMap;import java.util.Map;public class AckAndNackProducer {public static void main(String[] args) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setVirtualHost(/);factory.setUsername(guest);factory.setPassword(guest);Connection connection factory.newConnection();Channel channel connection.createChannel();String exchangeName test_ack_exchange;String routingKey item.update;String msg this is ack msg;for (int i 0; i 5; i) {Map headers new HashMap();headers.put(num ,i);AMQP.BasicProperties properties new AMQP.BasicProperties().builder().deliveryMode(2).headers(headers).build();String tem msg : i;channel.basicPublish(exchangeName, routingKey, true, properties, tem.getBytes());System.out.println(Send message : msg);}channel.close();connection.close();}}import com.rabbitmq.client.*;import java.io.IOException;public class AckAndNackConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setVirtualHost(/);factory.setUsername(guest);factory.setPassword(guest);factory.setAutomaticRecoveryEnabled(true);factory.setNetworkRecoveryInterval(3000);Connection connection factory.newConnection();final Channel channel connection.createChannel();String exchangeName test_ack_exchange;String queueName test_ack_queue;String routingKey item.#;channel.exchangeDeclare(exchangeName, topic, true, false, null);channel.queueDeclare(queueName, false, false, false, null);//一般不用代码绑定在管理界面手动绑定channel.queueBind(queueName, exchangeName, routingKey);Consumer consumer new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {String message new String(body, UTF-8);System.out.println( [x] Received message );try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}if ((Integer) properties.getHeaders().get(num) 0) {channel.basicNack(envelope.getDeliveryTag(), false, true);} else {channel.basicAck(envelope.getDeliveryTag(), false);}}};//6. 设置 Channel 消费者绑定队列channel.basicConsume(queueName, false, consumer);}}我们此处只关心消费端输出可以看到第 0 条消费失败重新回到队列尾部消费。[x] Received this is ack msg:1[x] Received this is ack msg:2[x] Received this is ack msg:3[x] Received this is ack msg:4[x] Received this is ack msg:0[x] Received this is ack msg:0[x] Received this is ack msg:0[x] Received this is ack msg:0[x] Received this is ack msg:0
http://www.zqtcl.cn/news/86735/

相关文章:

  • 本地主机做网站服务器浙江同凯建设深圳公司
  • 电影影视网站模板免费下载线上推广专员是干嘛的
  • 网站备案相关前置许可平面设计必学软件
  • 海建网站网站建设公司如何运营
  • 爬虫 做资讯网站深圳营销培训班
  • 网站制作前需要进行规划设计个人作品主页wordpress
  • 东莞樟木头网站制作html5网页设计工具
  • 长春网站公司有哪些内容aoc24g2色域
  • 湖北建设厅网站哔哩哔哩视频推广
  • 电子商务网站有哪些?网站建设难点
  • 制作外贸网站公司上海新闻最新消息
  • wordpress登录栏绍兴seo排名收费
  • 西安seo网站关键词廊坊网站seo
  • 长沙网站定制公司深圳网站建设推广
  • 网站建设资讯wordpress模板文件命名
  • 北京市门头沟有没有做网站的高水平高职建设网站
  • 徐州有哪些制作网站的公司手机网址是什么
  • 石油网站编辑怎么做注册公司流程及费用查询
  • 如何做家教网站建立自己的网站有什么用
  • python做网站模板手机网站域名
  • 甘肃张掖网站建设怎么做跑腿网站
  • 北京网站优化解决方案自己买个服务器开传奇
  • 网站开发运行环境论文平台网站开发公司组织架构
  • 眉山网站开发官方传奇游戏
  • 如何看网站开发语言宣传网站怎么做
  • 企业怎么做网站文档下载网站 建设
  • 如何用rp做网站大连网络营销公司哪家好
  • 网站设计公司销售渠道建设百度搜索软件
  • 网站空间哪里的好做电影网站危险吗
  • 网站建设公司使用图片侵权使用者有无责任wordpress po修改