广州网站推广电话,wordpress 首页大图,wordpress如何更新缓存,推广网络应用场景
之前我们已经通过《Spring Cloud Stream消费失败后的处理策略#xff08;一#xff09;#xff1a;自动重试》一文介绍了Spring Cloud Stream默认的消息重试功能。本文将介绍RabbitMQ的binder提供的另外一种重试功能#xff1a;重新入队。
动手试试
准备一个会…应用场景
之前我们已经通过《Spring Cloud Stream消费失败后的处理策略一自动重试》一文介绍了Spring Cloud Stream默认的消息重试功能。本文将介绍RabbitMQ的binder提供的另外一种重试功能重新入队。
动手试试
准备一个会消费失败的例子可以直接沿用前文的工程也可以新建一个然后创建如下代码的逻辑
EnableBinding(TestApplication.TestTopic.class)SpringBootApplicationpublic class TestApplication { public static void main(String[] args) { SpringApplication.run(TestApplication.class, args); } RestController static class TestController { Autowired private TestTopic testTopic; /** * 消息生产接口 * * param message * return */ GetMapping(/sendMessage) public String messageWithMQ(RequestParam String message) { testTopic.output().send(MessageBuilder.withPayload(message).build()); return ok; } } /** * 消息消费逻辑 */ Slf4j Component static class TestListener { private int count 1; StreamListener(TestTopic.INPUT) public void receive(String payload) { log.info(Received payload : payload , count); throw new RuntimeException(Message consumer failed!); } } interface TestTopic { String OUTPUT example-topic-output; String INPUT example-topic-input; Output(OUTPUT) MessageChannel output(); Input(INPUT) SubscribableChannel input(); }}内容很简单既包含了消息的生产也包含了消息消费。消息消费的时候主动抛出了一个异常来模拟消息的消费失败。
在启动应用之前还要记得配置一下输入输出通道对应的物理目标exchange或topic名、并设置一下分组比如
spring.cloud.stream.bindings.example-topic-input.destinationtest-topicspring.cloud.stream.bindings.example-topic-input.groupstream-exception-handlerspring.cloud.stream.bindings.example-topic-input.consumer.max-attempts1spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.requeue-rejectedtruespring.cloud.stream.bindings.example-topic-output.destinationtest-topic完成了上面配置之后启动应用并访问localhost:8080/sendMessage?messagehello接口来发送一个消息到MQ中了此时可以看到程序不断的抛出了消息消费异常。这是由于这里我们多加了一个配置spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.requeue-rejectedtrue。在该配置作用之下消息消费失败之后并不会将该消息抛弃而是将消息重新放入队列所以消息的消费逻辑会被重复执行直到这条消息消费成功为止。
深入思考
在完成了上面的这个例子之后可能读者会有下面两个常见问题
问题一之前介绍的Spring Cloud Stream默认提供的默认功能spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts与本文所说的重入队列实现的重试有什么区别
Spring Cloud Stream默认提供的默认功能只是对处理逻辑的重试它们的处理逻辑是由同一条消息触发的。而本文所介绍的重新入队史通过重新将消息放入队列而触发的所以实际上是收到了多次消息而实现的重试。
问题二如上面的例子那样消费一直不成功这些不成功的消息会被不断堆积起来如何解决这个问题
对于这个问题我们可以联合前文介绍的DLQ队列来完善消息的异常处理。
我们只需要增加如下配置自动绑定dlq队列
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlqtrue然后改造一下消息处理程序可以根据业务情况为进入dlq队列增加一个条件比如下面的例子
StreamListener(TestTopic.INPUT)public void receive(String payload) { log.info(Received payload : payload , count); if (count 3) { count 1; throw new AmqpRejectAndDontRequeueException(tried 3 times failed, send to dlq!); } else { count ; throw new RuntimeException(Message consumer failed!); }}设定了计数器count当count为3的时候抛出AmqpRejectAndDontRequeueException这个特定的异常。此时当只有当抛出这个异常的时候才会将消息放入DLQ队列从而不会造成严重的堆积问题。
代码示例
本文示例读者可以通过查看下面仓库的中的stream-exception-handler-4项目
GithubGitee
如果您对这些感兴趣欢迎star、follow、收藏、转发给予支持
以下专题教程也许您会有兴趣
Spring Boot基础教程Spring Cloud基础教程