建立学校网站需要多少钱?,北京交易网站建设,廊坊seo管理,a5源码网站#RabbitMQ 监控(三)验证RabbitMQ健康运行只是确保消息通信架构可靠性的一部分#xff0c;同时#xff0c;你也需要确保消息通信结构配置没有遭受意外修改#xff0c;从而避免应用消息丢失。RabbitMQ Management HTTP API提供了一个方法允许你查看任何vhost上的任何队列…#RabbitMQ 监控(三)验证RabbitMQ健康运行只是确保消息通信架构可靠性的一部分同时你也需要确保消息通信结构配置没有遭受意外修改从而避免应用消息丢失。RabbitMQ Management HTTP API提供了一个方法允许你查看任何vhost上的任何队列/api/queues//。你不仅可以查看配置详情还可以查看队列的数据统计例如队列消耗的内存或者队列的平均消息吞吐量。使用curl测试一下该API这里的/%2F还是代表默认的vhost(/)。curl -u guest:guest http://127.0.0.1:15672/api/queues/%2F/springrabbitexerciseresponse{consumer_details: [{channel_details: {peer_host: 127.0.0.1,peer_port: 62679,connection_name: 127.0.0.1:62679 - 127.0.0.1:5672,user: guest,number: 2,node: rabbitlocalhost,name: 127.0.0.1:62679 - 127.0.0.1:5672 (2)},arguments: [],prefetch_count: 1,ack_required: true,exclusive: false,consumer_tag: amq.ctag-YImeU8Fm_VahDpxv8EAw2Q,queue: {vhost: /,name: springrabbitexercise}}],messages_details: {rate: 7357},messages: 232517,messages_unacknowledged_details: {rate: 0.2},messages_unacknowledged: 5,messages_ready_details: {rate: 7356.8},messages_ready: 232512,reductions_details: {rate: 1861021.8},reductions: 58754154,...auto_delete: false,durable: true,vhost: /,name: springrabbitexercise,message_bytes_persistent: 2220250,message_bytes_ram: 2220250,message_bytes_unacknowledged: 40,message_bytes_ready: 2220210,message_bytes: 2220250,messages_persistent: 232517,messages_unacknowledged_ram: 5,messages_ready_ram: 232512,messages_ram: 232517,garbage_collection: {minor_gcs: 0,fullsweep_after: 65535,min_heap_size: 233,min_bin_vheap_size: 46422,max_heap_size: 0},state: running}为了方便阅读去掉了部分返回值但是还是可以看到队列的很多信息。例如可以看到一个consumer的信息、消息占用的内存、队列的durable、auto_delete属性等。利用这些配置信息新的健康监控程序可以通过API方法的输出来轻松监控队列的属性并在发生变更时通知你。就像之前编写健康检测程序那样除了服务器、端口、vhost、用户名和密码之外还需要知道* 队列的名称以便监控其配置* 该队列是否将durable和auto_delete选项打开###清单3.1 检测队列配置完整代码在我的github下面代码中的Data和Slf4j都是插件lombok中的注解想要了解的可自行百度。1.定义查看队列信息的接口 RMQResource.javaPath(api)Consumes({MediaType.APPLICATION_JSON})Produces({MediaType.APPLICATION_JSON})public interface RMQResource {/*** Return a queues info** param vhost* param name* return {link QueueInfo}*/GETPath(queues/{vhost}/{name})Response getQueueInfo(PathParam(vhost) String vhost, PathParam(name) String name);}2.定义查看队列接口的返回值 QueueInfo.javaDatapublic class QueueInfo {private ConsumerDetails[] consumer_details;/*** unknown class*/JsonIgnoreprivate Object[] incoming;/*** unknown class*/JsonIgnoreprivate Object[] deliveries;/*** unknown class*/JsonIgnoreprivate Object arguments;private Boolean exclusive;//...private Boolean auto_delete;private Boolean durable;private String vhost;private String name;/*** unknown class*/JsonIgnoreprivate Object head_message_timestamp;/*** unknown class*/JsonIgnoreprivate Object recoverable_slaves;private Long memory;private Double consumer_utilisation;private Integer consumers;/*** unknown class*/JsonIgnoreprivate Object exclusive_consumer_tag;/*** unknown class*/JsonIgnoreprivate Object policy;JsonFormat(pattern yyyy-MM-dd hh:mm:ss)private Date idle_since;}3.检测队列配置 QueueConfigCheck.java/*** 检测队列配置*/Slf4jpublic class QueueConfigCheck {private final static RMQResource rmqResource RMQApi.getService(RMQResource.class);public static void checkQueueConfig(String vhost, CheckQueue queue) {RMQConfig config RMQConfig.Singleton.INSTANCE.getRmqConfig();String host config.getHost();Response response null;try {response rmqResource.getQueueInfo(vhost, queue.getQueue_name());} catch (Exception e) {log.error(UNKNOWN: Could not connect to {}, cause {}, host, e.getMessage());ExitUtil.exit(ExitType.UNKNOWN.getValue());}if (response null || response.getStatus() 404) {log.error(CRITICAL: Queue {} does not exist., queue.getQueue_name());ExitUtil.exit(ExitType.CRITICAL.getValue());} else if (response.getStatus() 299) {log.error(UNKNOWN: Unexpected API error : {}, response);ExitUtil.exit(ExitType.UNKNOWN.getValue());} else {QueueInfo info response.readEntity(QueueInfo.class);if (!info.getAuto_delete().equals(queue.getAuto_delete())) {log.warn(WARN: Queue {} - auto_delete flag is NOT {}, queue.getQueue_name(), info.getAuto_delete());ExitUtil.exit(ExitType.WARN.getValue());}if (!info.getDurable().equals(queue.getDurable())) {log.warn(WARN: Queue {} - durable flag is NOT {}, queue.getQueue_name(), info.getDurable());ExitUtil.exit(ExitType.WARN.getValue());}}log.info(OK: Queue {} configured correctly., queue.getQueue_name());ExitUtil.exit(ExitType.OK.getValue());}}4.检测队列配置的方法参数 CheckQueue.javaDatapublic class CheckQueue {private final String queue_name;private final Boolean auto_delete;private final Boolean durable;public CheckQueue(String queue_name, Boolean auto_delete, Boolean durable) {this.queue_name queue_name;this.auto_delete auto_delete;this.durable durable;}}5.运行检测程序Testpublic void testQueueConfig() {String queue_name springrabbitexercise;Boolean auto_delete false;Boolean durable true;String vhost /;CheckQueue queue new CheckQueue(queue_name, auto_delete, durable);QueueConfigCheck.checkQueueConfig(vhost, queue);}可以看到监控正常运行11:38:23.286 [main] INFO com.lanxiang.rabbitmqmonitor.check.QueueConfigCheck - OK: Queue springrabbitexercise configured correctly.11:38:23.289 [main] INFO com.lanxiang.rabbitmqmonitor.terminate.ExitUtil - Status is OK这段RabbitMQ队列检测的程序有一处修改如果健康检测程序无法连接到API服务器的话会返回EXIT_UNKNOWN。前一章的API ping健康检测要么成功要么失败故障代码之间没有区别但是队列检测API方法在失败时通过HTTP状态码提供了更多信息。如果HTTP状态码是404就代表尝试验证的队列不存在检测失败并返回EXIT_CRITICAL。对于其他大于299的HTTP状态码退出代码为EXIT_UNKNOWN。在获取到RabbitMQ API的response之后使用JSON进行解码并且把得到的durable和auto_delete参数与期望的参数进行比较如果参数和预期不相符的话返回EXIT_WARNING或者EXIT_CRITICAL状态码。如果队列所有的配置都正确的话那么就正确退出。在了解我们对RabbitMQ做监控的原理之后可以根据RabbitMQ Management HTTP API定制更多的监控例如* /api/nodes可以获取集群中每个节点的数据* /api/queues//可以获取队列的详细情况例如消息处理的速率、积压的消息数量等。除此之外还有许多其他API我们要做的就是根据自身的业务逻辑和这些API来设计合理的监控脚本。RabbitMQ监控系列就到此结束啦还是很可惜没有实战的机会吧因为最近在工作变动期间看了一下RabbitMQ实战这本书兴起想写一下博客试试。毕业快一年了想养成写博客的习惯。正好最近也在工作变动中能有闲暇时间尝试一下博客写的比较水多多包涵。