西安专业做网站,成都 网站建设培训班,免费网站域名和空间,c 语言做网站在[八]RabbitMQ-客户端源码之ChannelN中讲述basicConsume的方法时设计到Consumer这个回调函数#xff0c;Consumer其实是一个接口#xff0c;真正实现它的是QueueingConsumer和DefaultConsumer#xff0c;且DefaultConsumer是QueueingConsumer的父类#xff0c;里面都是空方… 在[八]RabbitMQ-客户端源码之ChannelN中讲述basicConsume的方法时设计到Consumer这个回调函数Consumer其实是一个接口真正实现它的是QueueingConsumer和DefaultConsumer且DefaultConsumer是QueueingConsumer的父类里面都是空方法。在用户使用时可以简单的采用QueueingConsumer或者采用DefaultConsumer来重写某些方法。 这里先来看下消费者客户端的关键代码 QueueingConsumer consumer new QueueingConsumer(channel);channel.basicQos(32);channel.basicConsume(QUEUE_NAME, false, consumer_zzh,consumer)while (true) {QueueingConsumer.Delivery delivery consumer.nextDelivery();String message new String(delivery.getBody());System.out.println( [X] Received message );channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);} 可以看到QueueingConsumer作为channel.basicConsume的回调函数之后再进行处理。 在AMQConnection中有关MainLoop的主线程专门用来”第一线”的处理Broker发送回客户端从帧。当Basic.Consume/.ConsumeOk开启消费模式之后Broker主动的向客户端发送Basic.Delivery帧MainLoop线程一步步的调用最后到ChannelN的processAsync()方法中有 if (method instanceof Basic.Deliver) {processDelivery(command, (Basic.Deliver) method);return true;
} 之后调用processDelivery方法 protected void processDelivery(Command command, Basic.Deliver method) {Basic.Deliver m method;Consumer callback _consumers.get(m.getConsumerTag());if (callback null) {if (defaultConsumer null) {throw new IllegalStateException(Unsolicited delivery - see Channel.setDefaultConsumer to handle this case.);}else {callback defaultConsumer;}}Envelope envelope new Envelope(m.getDeliveryTag(), m.getRedelivered(),m.getExchange(),m.getRoutingKey());try {this.dispatcher.handleDelivery(callback, m.getConsumerTag(),envelope, (BasicProperties) command.getContentHeader(),command.getContentBody());} catch (Throwable ex) {getConnection().getExceptionHandler().handleConsumerException(this, ex,callback,m.getConsumerTag(), handleDelivery);}
} 这个方法首先根据consumerTag从ChannelN中的_consumer这个HashMap中获取相应的Consumer回调函数然后调用这个回调函数的handleDeliver()方法进行处理这里有些同学会有疑问明明是调用ConsumerDispatcher dispatcher的handleDeliver()方法其实这里只是包了一层皮ConsumerDispatcher的handleDeliver()方法就是调用了Consumer的handleDeliver()方法。 我们接下去看看QueueingConsumer这个实现Consumer接口的类是怎么处理的 Override public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException
{checkShutdown();this._queue.add(new Delivery(envelope, properties, body));
} 这里的queue就是一个LinkedBlockingQueue客户端程序通过调用nextDelivery()方法来获取数据 public Delivery nextDelivery()throws InterruptedException, ShutdownSignalException, ConsumerCancelledException
{return handle(_queue.take());
}private Delivery handle(Delivery delivery) {if (delivery POISON ||delivery null (_shutdown ! null || _cancelled ! null)) {if (delivery POISON) {_queue.add(POISON);if (_shutdown null _cancelled null) {throw new IllegalStateException(POISON in queue, but null _shutdown and null _cancelled. This should never happen, please report as a BUG);}}if (null ! _shutdown)throw Utility.fixStackTrace(_shutdown);if (null ! _cancelled)throw Utility.fixStackTrace(_cancelled);}return delivery;
} 这个nextDelivery方法说白就是一个LinkedBlockingQueue的take()操作也就是一个可能会阻塞等待的操作。 附本系列全集 [Conclusion]RabbitMQ-客户端源码之总结[一]RabbitMQ-客户端源码之ConnectionFactory[二]RabbitMQ-客户端源码之AMQConnection[三]RabbitMQ-客户端源码之ChannelManager[四]RabbitMQ-客户端源码之Frame[五]RabbitMQ-客户端源码之AMQChannel[六]RabbitMQ-客户端源码之AMQCommand[七]RabbitMQ-客户端源码之AMQPImplMethod[八]RabbitMQ-客户端源码之ChannelN[九]RabbitMQ-客户端源码之Consumer