湖北优化网站建设,中国房地产未来走势,如何做ps4游戏视频网站,seo优化专员默认的消费异常处理
默认情况下#xff0c;如果程序没有显式做任何的异常处理#xff0c;spring-kafka会提供一个默认的DefaultErrorHandler, 它会使用FixedBackOff做重试#xff0c;会不间断的连续重试最多9次#xff0c;也就是说一个消息最多会被消费10次。如果重试次数耗…默认的消费异常处理
默认情况下如果程序没有显式做任何的异常处理spring-kafka会提供一个默认的DefaultErrorHandler, 它会使用FixedBackOff做重试会不间断的连续重试最多9次也就是说一个消息最多会被消费10次。如果重试次数耗尽最终会在控制台打印异常并且会提交offset也就是说这条消息就被丢弃了。
举个例子
发消息
GetMapping(send/{msg})public String send(PathVariable(msg)String msg){CompletableFuture future kafkaTemplate.send(test-topic, msg);try{future.get();log.info(消息发送成功);}catch(Exception e){e.printStackTrace();}return OK;}收消息
Component
public class DemoListener {private static Logger log LoggerFactory.getLogger(DemoListener.class);KafkaListener(topics {test-topic})public void onMessage(ConsumerRecord record){Object value record.value();log.info(收到消息{}, value);throw new RuntimeException(manually throw);}
}kafka的配置
spring:kafka:bootstrap-servers: localhost:9092 # Kafka服务器地址consumer:group-id: my-group # 默认的消费者组IDauto-offset-reset: earliest # 如果没有初始偏移量或偏移量已失效从最早的消息开始读取key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
现在发一条消息做测试控制台输出如下
2025-09-14T10:26:27.50808:00 INFO 5912 --- [nio-8080-exec-1] c.g.xjs.kafka.controller.DemoController : 消息发送成功
2025-09-14T10:26:27.50908:00 INFO 5912 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息hello
......
2025-09-14T10:26:31.66608:00 INFO 5912 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息hello
2025-09-14T10:26:31.68008:00 INFO 5912 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientIdconsumer-my-group-1, groupIdmy-group] Seeking to offset 6 for partition test-topic-0
2025-09-14T10:26:31.68008:00 INFO 5912 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered
2025-09-14T10:26:32.17408:00 INFO 5912 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息hello
2025-09-14T10:26:32.18208:00 ERROR 5912 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Backoff FixedBackOff{interval0, currentAttempts10, maxAttempts9} exhausted for test-topic-06org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method public void com.github.xjs.kafka.listener.DemoListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord) threw exceptionat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2996) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2903) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2867) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2779) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2616) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2505) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2151) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1527) ~[spring-kafka-3.3.3.jar:3.3.3]自定义重试逻辑
我们可以自定义一个DefaultErrorHandler的bean来自定义重试逻辑比如
Bean
public DefaultErrorHandler errorHandler(){ExponentialBackOff backOff new ExponentialBackOff();// 最大的重试间隔默认是30秒backOff.setMaxInterval(30000);// 初始的重试间隔默认是2秒backOff.setInitialInterval(3000);// 间隔倍数下一次间隔 当前间隔 * 间隔倍数默认是1.5backOff.setMultiplier(3);// 最大重试次数 默认无限制重试如果按照默认配置首次重试隔2秒下一次隔2*1.53秒以此类推backOff.setMaxAttempts(2);return new DefaultErrorHandler(null,backOff);
}现在重新发一条消息控制台输出
2025-09-14T10:42:32.06908:00 INFO 1288 --- [nio-8080-exec-1] c.g.xjs.kafka.controller.DemoController : 消息发送成功
2025-09-14T10:42:32.07008:00 INFO 1288 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息hello
2025-09-14T10:42:35.12808:00 INFO 1288 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientIdconsumer-my-group-1, groupIdmy-group] Seeking to offset 8 for partition test-topic-0
2025-09-14T10:42:35.12908:00 INFO 1288 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered
2025-09-14T10:42:35.13108:00 INFO 1288 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息hello
2025-09-14T10:42:44.19308:00 INFO 1288 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientIdconsumer-my-group-1, groupIdmy-group] Seeking to offset 8 for partition test-topic-0
2025-09-14T10:42:44.19308:00 INFO 1288 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered
2025-09-14T10:42:44.19508:00 INFO 1288 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息hello
2025-09-14T10:42:44.19908:00 ERROR 1288 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Backoff ExponentialBackOffExecution{currentInterval9000ms, multiplier3.0, attempts2} exhausted for test-topic-08org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method public void com.github.xjs.kafka.listener.DemoListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord) threw exceptionat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2996) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2903) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2867) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2779) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2616) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2505) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2151) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1527) ~[spring-kafka-3.3.3.jar:3.3.3]
可以看到消息总共被接受了3次包含了2次重试第一次是在3每秒以后第二次是在9秒以后。
除了ExponentialBackOff 之外常见的还有ExponentialBackOffWithMaxRetries和FixedBackOff当然也可以自定义。
ExponentialBackOff 默认无限重试默认的最大重试间隔是30秒如果超过了30秒则按30秒算。
ExponentialBackOffWithMaxRetries可以设置最大的重试次数。
FixedBackOff是固定时间间隔默认是5秒默认没有重试次数限制。
队头阻塞与消息丢失问题
上面介绍的异常处理方式存在2个非常严重的问题一个是队头阻塞问题另一个是消息丢失问题。所谓的队头阻塞问题就是说当一条消息在进行重试的时候就算topic中有了新的消息消费者也无法消费到因为消费者线程会以阻塞的方式进行重试重试结束以后才可以继续后面消息的消费如果重试时间很长就会导致后面的消息长时间得不到消费。消息丢失就很好理解了重试次数耗尽以后仅仅是打印一条错误的日志更好的处理办法是把错误的消息发送给死信Topic然后交由人工进行后续处理。接下来先来处理下消息丢失的问题。
死信Topic
在构造DefaultErrorHandler的时候还有一个参数是ConsumerRecordRecoverer,如果我们提供了这个recover那么重试次数耗尽以后消息会被传递给这个recover我们就可以把消费失败的消息重新投递到DLT中。
幸运的是spring-kafka已经提供了一个DeadLetterPublishingRecoverer就可以实现这个功能。
下面我们重写下DefaultErrorHandler :
Beanpublic DefaultErrorHandler errorHandler(KafkaTemplate kafkaTemplate){var recoverer new DeadLetterPublishingRecoverer(kafkaTemplate,(cr, e)- new TopicPartition(cr.topic().DLT, cr.partition()));ExponentialBackOff backOff new ExponentialBackOff();backOff.setMaxInterval(30000);backOff.setInitialInterval(3000);backOff.setMultiplier(3);backOff.setMaxAttempts(2);return new DefaultErrorHandler(recoverer,backOff);}在构造DeadLetterPublishingRecoverer的时候需要用到KafkaTemplate 同时我们需要设置DLT的topic和partition。
现在我们重新发一个消息控制台的日志
2025-09-14T11:17:48.53208:00 INFO 9804 --- [nio-8080-exec-4] c.g.xjs.kafka.controller.DemoController : 消息发送成功
2025-09-14T11:17:48.53308:00 INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息hello
2025-09-14T11:17:51.60908:00 INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientIdconsumer-my-group-1, groupIdmy-group] Seeking to offset 10 for partition test-topic-0
2025-09-14T11:17:51.60908:00 INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered
2025-09-14T11:17:51.61108:00 INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息hello
2025-09-14T11:18:00.70808:00 INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientIdconsumer-my-group-1, groupIdmy-group] Seeking to offset 10 for partition test-topic-0
2025-09-14T11:18:00.70808:00 INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered
2025-09-14T11:18:00.71008:00 INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息hello这次就没有异常抛出而且我们可以从DLT中看到消息
D:\kafka_2.12-3.9.1 .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic.DLT
hello非阻塞重试
还是使用上面的代码我们连续发送2条消息控制台输出如下
2025-09-14T11:24:02.83708:00 INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息1111
2025-09-14T11:24:03.86908:00 INFO 9804 --- [nio-8080-exec-8] c.g.xjs.kafka.controller.DemoController : 消息发送成功
2025-09-14T11:24:05.91408:00 INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientIdconsumer-my-group-1, groupIdmy-group] Seeking to offset 11 for partition test-topic-0
2025-09-14T11:24:05.91408:00 INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered
2025-09-14T11:24:05.91508:00 INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息1111
2025-09-14T11:24:14.96308:00 INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientIdconsumer-my-group-1, groupIdmy-group] Seeking to offset 11 for partition test-topic-0
2025-09-14T11:24:14.96308:00 INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered
2025-09-14T11:24:14.96508:00 INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息1111
2025-09-14T11:24:15.47008:00 INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientIdconsumer-my-group-1, groupIdmy-group] Seeking to offset 12 for partition test-topic-0
2025-09-14T11:24:15.47308:00 INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息2222
2025-09-14T11:24:18.55308:00 INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientIdconsumer-my-group-1, groupIdmy-group] Seeking to offset 12 for partition test-topic-0
2025-09-14T11:24:18.55308:00 INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered
2025-09-14T11:24:18.55408:00 INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息2222
2025-09-14T11:24:27.60908:00 INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientIdconsumer-my-group-1, groupIdmy-group] Seeking to offset 12 for partition test-topic-0
2025-09-14T11:24:27.60908:00 INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered
2025-09-14T11:24:27.61108:00 INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息2222
2025-09-14T11:24:28.63508:00 INFO 9804 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientIdconsumer-my-group-1, groupIdmy-group] Node -1 disconnected.
2025-09-14T11:24:58.12808:00 INFO 9804 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientIdproducer-1] Node -1 disconnected.可以看出来虽然消息是同时发出的但是第一条消息重试期间第二条消息是无法得到消费的。只有第一条消息的重试次数耗尽以后第二条消息才有机会被消费。如果重试时间间隔和次数都比较大这种阻塞式的重试就不合适了。
下面我们来看下如何使用非阻塞重试
Configuration
EnableKafkaRetryTopic //non-blocking:1
public class KafkaConfiguration {// non-blocking:2BeanTaskScheduler scheduler() {return new ThreadPoolTaskScheduler();}// non-blocking:3Beanpublic RetryTopicConfiguration myRetryConfiguration(KafkaTemplateString, String template) {return RetryTopicConfigurationBuilder.newInstance().exponentialBackoff(3000, 10, Long.MAX_VALUE).maxAttempts(3).dltSuffix(.DLT).create(template);}}首先添加EnableKafkaRetryTopic 注解然后提供一个TaskScheduler 的实例最后提供RetryTopicConfiguration 的实例
现在重启应用连续发送2个消息再次观察控制台输出
2025-09-14T11:44:40.30308:00 INFO 5380 --- [nio-8080-exec-8] c.g.xjs.kafka.controller.DemoController : 消息发送成功
2025-09-14T11:44:40.30408:00 INFO 5380 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息3333
2025-09-14T11:44:40.81708:00 INFO 5380 --- [etry-3000-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientIdconsumer-my-group-3, groupIdmy-group] Seeking to offset 3 for partition test-topic-retry-3000-0
2025-09-14T11:44:40.81708:00 INFO 5380 --- [etry-3000-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered
2025-09-14T11:44:41.28408:00 INFO 5380 --- [nio-8080-exec-5] c.g.xjs.kafka.controller.DemoController : 消息发送成功
2025-09-14T11:44:41.28408:00 INFO 5380 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息4444
2025-09-14T11:44:43.31608:00 INFO 5380 --- [etry-3000-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息3333
2025-09-14T11:44:43.82608:00 INFO 5380 --- [etry-3000-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientIdconsumer-my-group-3, groupIdmy-group] Seeking to offset 4 for partition test-topic-retry-3000-0
2025-09-14T11:44:43.82608:00 INFO 5380 --- [try-30000-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientIdconsumer-my-group-1, groupIdmy-group] Seeking to offset 3 for partition test-topic-retry-30000-0
2025-09-14T11:44:43.82608:00 INFO 5380 --- [try-30000-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered
2025-09-14T11:44:43.82808:00 INFO 5380 --- [etry-3000-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientIdconsumer-my-group-3, groupIdmy-group] Seeking to offset 4 for partition test-topic-retry-3000-0
2025-09-14T11:44:43.82808:00 INFO 5380 --- [etry-3000-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered
2025-09-14T11:44:44.33208:00 INFO 5380 --- [etry-3000-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息4444
2025-09-14T11:45:13.33408:00 INFO 5380 --- [try-30000-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息3333
2025-09-14T11:45:13.33408:00 ERROR 5380 --- [try-30000-0-C-1] k.r.DeadLetterPublishingRecovererFactory : Record: topic test-topic-retry-30000, partition 0, offset 3, main topic test-topic threw an error at topic test-topic-retry-30000 and wont be retried. Sending to DLT with name test-topic.DLT.org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failedat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:3000) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2903) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2867) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2779) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2616) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2505) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2151) ~[spring-kafka-3.3.3.jar:3.3.3]at可以看到再也不会存在队头阻塞问题并且消息也成功投递到了DLT中
D:\kafka_2.12-3.9.1 .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic.DLT
3333
4444非阻塞重试的原理
我们查看下kafka中的topic列表
D:\kafka_2.12-3.9.1 .\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
__consumer_offsets
test-topic
test-topic-retry-3000
test-topic-retry-30000
test-topic.DLT此时会发现多出来2个带retry的topictest-topic-retry-3000 和 test-topic-retry-30000。
如果消息处理失败该消息会被转发到一个retry的topic。消费者会检查时间戳如果尚未到达重试时间则会暂停该主题分区的消费。当到达重试时间时分区消费会恢复消息会被再次消费。这也是为啥我们要配置一个TaskScheduler的原因。如果消息处理再次失败消息将被转发到下一个重试主题重复此模式直到处理成功或者重试次数用尽最后消息被发送到DLT。
以我们的案例来说采用初始3秒的指数退避策略乘数为10最大重试3-12次系统将自动创建test-topic-retry-3000和test-topic-retry-30000和test-topic.DLT。
参考https://docs.spring.io/spring-kafka/reference/index.html