做电商网站用什么软件开发,山东郓城网站建设,广告公司和设计公司,wordpress标签不被收录前言
笔者使用了kafka用来传输数据#xff0c;笔者在今年10月写了文章#xff0c;怎么使用配置化实现kafka的装载#xff1a;springboot kafka多数据源#xff0c;通过配置动态加载发送者和消费者-CSDN博客
不过在实际运行中#xff0c;kafka broker是加密的#xff0c…前言
笔者使用了kafka用来传输数据笔者在今年10月写了文章怎么使用配置化实现kafka的装载springboot kafka多数据源通过配置动态加载发送者和消费者-CSDN博客
不过在实际运行中kafka broker是加密的本来也没啥但是突然的一天笔者在监控发现消费者掉线了发送者居然还是正常的见鬼的事情就是这么朴实的发生了而且日志有Authentication/Authorization Exception and no authExceptionRetryInterval set
这样的错误还有
Closing the Kafka consumer
Kafka consumer has been closed
这样的日志非常诡异后面查询kafka集群才知道kafka集群在某个时间被改了AUTH配置然后又改回来了神奇的操作。
现象
实际上看到日志是懵的毕竟kafka是对方提供的AUTH用户名和密码也是对方给的而且发送者也出现AUTH失败但是发送者一直重试然后因为kafka集群的AUTH改回来了重试成功了唯独消费者AUTH失败后关闭了。
源码分析
从日志搜索Authentication/Authorization Exception and no authExceptionRetryInterval set
发现日志出现在
org.springframework.kafka.listener.KafkaMessageListenerContainer
刚好在消息监听器容器的内部类
ListenerConsumer
看源码定义是一个定时线程池的任务定义
private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback
分析怎么去消费的KafkaMessageListenerContainer的doStart方法 既然明晰了那么分析问题的来源, run方法 public void run() {ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());publishConsumerStartingEvent(); //事件通知spring事件this.consumerThread Thread.currentThread();setupSeeks();KafkaUtils.setConsumerGroupId(this.consumerGroupId);this.count 0;this.last System.currentTimeMillis();initAssignedPartitions(); // 初始分配partitionspublishConsumerStartedEvent(); // 消费者启动事件Throwable exitThrowable null;this.lastReceive System.currentTimeMillis();while (isRunning()) { //状态在上面的截图代码已经更新为运行状态try {pollAndInvoke(); // 队列拉取拉取过程会出现各种异常}catch (NoOffsetForPartitionException nofpe) { //这个需要注意但是这个是不可配置的this.fatalError true;ListenerConsumer.this.logger.error(nofpe, No offset and no reset policy);exitThrowable nofpe;break;}catch (AuthenticationException | AuthorizationException ae) { //这个是授权失败就是来源于kafka broker的auth鉴权if (this.authExceptionRetryInterval null) { //时间配置默认居然是nullListenerConsumer.this.logger.error(ae,Authentication/Authorization Exception and no authExceptionRetryInterval set);this.fatalError true;exitThrowable ae;break; //循环结束}else { // 一段时间后重试ListenerConsumer.this.logger.error(ae,Authentication/Authorization Exception, retrying in this.authExceptionRetryInterval.toMillis() ms);// We cant pause/resume here, as KafkaConsumer doesnt take pausing// into account when committing, hence risk of being flooded with// GroupAuthorizationExceptions.// see: https://github.com/spring-projects/spring-kafka/pull/1337sleepFor(this.authExceptionRetryInterval);}}catch (FencedInstanceIdException fie) {this.fatalError true;ListenerConsumer.this.logger.error(fie, ConsumerConfig.GROUP_INSTANCE_ID_CONFIG has been fenced);exitThrowable fie;break;}catch (StopAfterFenceException e) {this.logger.error(e, Stopping container due to fencing);stop(false);exitThrowable e;}catch (Error e) { // NOSONAR - rethrownthis.logger.error(e, Stopping container due to an Error);this.fatalError true;wrapUp(e);throw e;}catch (Exception e) {handleConsumerException(e);}finally {clearThreadState();}}//上面异常后这里的处理wrapUp(exitThrowable);}
注意NoOffsetForPartitionException和AuthenticationException | AuthorizationException其中能配置重试的是AuthenticationException | AuthorizationException异常就是授权失败可以通过配置过一段时间抢救一下一般而言kafka首次授权失败基本上就不太可能成功了但是这个只能控制consumer销毁producer还在重试所以出现了消费者销毁了发送者在kafka集群auth还原后成功恢复。看看wrapUp方法 既然知道了原理那么解决办法是配置
authExceptionRetryInterval解决方法
配置authExceptionRetryInterval也是不容易分析取值
private final Duration authExceptionRetryInterval this.containerProperties.getAuthExceptionRetryInterval();
从 containerProperties来的实际上是继承org.springframework.kafka.listener.ConsumerProperties 在spring-kafka 2.8版本还对属性命名重构了毕竟以前的命名字母太多了
不过要修改这个值也不容易kafkaproperties并没有提供这个参数而且创建消费者容器工厂时
org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory 那么只能在
ConcurrentMessageListenerContainer
创建成功后从这个里面读取配置设置默认值。这就可以使用前面埋点的spring事件了通过事件拿到Consumer不就可以修改配置了
使用
publishConsumerStartedEvent();
的事件最合适执行循环前最后一个事件而且这里的this就是消费者容器org.springframework.kafka.listener.KafkaMessageListenerContainer对象就是我们需要的 编写代码如下
Component
public class KafkaConsumerStartedListener implements ApplicationListenerConsumerStartedEvent {Overridepublic void onApplicationEvent(ConsumerStartedEvent event) {KafkaMessageListenerContainer?, ? container event.getSource(KafkaMessageListenerContainer.class);container.getContainerProperties().setAuthExceptionRetryInterval(Duration.of(30, ChronoUnit.SECONDS));}
}
就这样就解决了问题哈哈哈贼简单。
笔者在使用API时发现了还有一个API是授权失败后重启是布尔变量默认值是false即不重试。那么这个是哪里触发的呢在我们看到的消费者销毁事件里面实际上控制重试有多种办法一个是循环去kafka broker拉取一个是重启kafka消费者容器。 这个在上面已经说明了简单截个图再说明一下 设置的地方还是上面的代码里面同理也可以修改事件为
ConsumerStoppedEvent
可以更精准当然就用刚刚的代码加一行设置也是可以的。
总结
kafka在发送者和消费者是区分开的发送者如果连接kafka broker失败后可以一直重试直到成功但是消费者确有各种各样的逻辑可以精准控制比如消费者重启的配置
restartAfterAuthExceptions
可以控制消费者在停止时重启如果仅仅是授权失败而且不需要反复重启消耗资源那么可以通过
authExceptionRetryInterval
配置时间周期的方式实现但是kafka并没有给我们配置的入口但是kafka在消费者启动消费的过程埋了很多spring事件钩子通过这些钩子可以操作估计spring-kafka也不希望我们去修改毕竟消费者启动失败了或授权失败了消费者自动销毁是符合正常逻辑的。如果不使用kafka自己提供的事件可以在启动完成通过
org.springframework.kafka.config.KafkaListenerEndpointRegistry拿到所有消费者容器来批量设置属性毕竟spring-kafka也是通过这个端点注册器注册MessageListenerContainer的。