当前位置: 首页 > news >正文

做电商网站用什么软件开发山东郓城网站建设

做电商网站用什么软件开发,山东郓城网站建设,广告公司和设计公司,wordpress标签不被收录前言 笔者使用了kafka用来传输数据#xff0c;笔者在今年10月写了文章#xff0c;怎么使用配置化实现kafka的装载#xff1a;springboot kafka多数据源#xff0c;通过配置动态加载发送者和消费者-CSDN博客 不过在实际运行中#xff0c;kafka broker是加密的#xff0c…前言 笔者使用了kafka用来传输数据笔者在今年10月写了文章怎么使用配置化实现kafka的装载springboot kafka多数据源通过配置动态加载发送者和消费者-CSDN博客 不过在实际运行中kafka broker是加密的本来也没啥但是突然的一天笔者在监控发现消费者掉线了发送者居然还是正常的见鬼的事情就是这么朴实的发生了而且日志有Authentication/Authorization Exception and no authExceptionRetryInterval set 这样的错误还有 Closing the Kafka consumer Kafka consumer has been closed 这样的日志非常诡异后面查询kafka集群才知道kafka集群在某个时间被改了AUTH配置然后又改回来了神奇的操作。 现象 实际上看到日志是懵的毕竟kafka是对方提供的AUTH用户名和密码也是对方给的而且发送者也出现AUTH失败但是发送者一直重试然后因为kafka集群的AUTH改回来了重试成功了唯独消费者AUTH失败后关闭了。 源码分析 从日志搜索Authentication/Authorization Exception and no authExceptionRetryInterval set 发现日志出现在 org.springframework.kafka.listener.KafkaMessageListenerContainer 刚好在消息监听器容器的内部类 ListenerConsumer 看源码定义是一个定时线程池的任务定义 private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback 分析怎么去消费的KafkaMessageListenerContainer的doStart方法 既然明晰了那么分析问题的来源, run方法 public void run() {ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());publishConsumerStartingEvent(); //事件通知spring事件this.consumerThread Thread.currentThread();setupSeeks();KafkaUtils.setConsumerGroupId(this.consumerGroupId);this.count 0;this.last System.currentTimeMillis();initAssignedPartitions(); // 初始分配partitionspublishConsumerStartedEvent(); // 消费者启动事件Throwable exitThrowable null;this.lastReceive System.currentTimeMillis();while (isRunning()) { //状态在上面的截图代码已经更新为运行状态try {pollAndInvoke(); // 队列拉取拉取过程会出现各种异常}catch (NoOffsetForPartitionException nofpe) { //这个需要注意但是这个是不可配置的this.fatalError true;ListenerConsumer.this.logger.error(nofpe, No offset and no reset policy);exitThrowable nofpe;break;}catch (AuthenticationException | AuthorizationException ae) { //这个是授权失败就是来源于kafka broker的auth鉴权if (this.authExceptionRetryInterval null) { //时间配置默认居然是nullListenerConsumer.this.logger.error(ae,Authentication/Authorization Exception and no authExceptionRetryInterval set);this.fatalError true;exitThrowable ae;break; //循环结束}else { // 一段时间后重试ListenerConsumer.this.logger.error(ae,Authentication/Authorization Exception, retrying in this.authExceptionRetryInterval.toMillis() ms);// We cant pause/resume here, as KafkaConsumer doesnt take pausing// into account when committing, hence risk of being flooded with// GroupAuthorizationExceptions.// see: https://github.com/spring-projects/spring-kafka/pull/1337sleepFor(this.authExceptionRetryInterval);}}catch (FencedInstanceIdException fie) {this.fatalError true;ListenerConsumer.this.logger.error(fie, ConsumerConfig.GROUP_INSTANCE_ID_CONFIG has been fenced);exitThrowable fie;break;}catch (StopAfterFenceException e) {this.logger.error(e, Stopping container due to fencing);stop(false);exitThrowable e;}catch (Error e) { // NOSONAR - rethrownthis.logger.error(e, Stopping container due to an Error);this.fatalError true;wrapUp(e);throw e;}catch (Exception e) {handleConsumerException(e);}finally {clearThreadState();}}//上面异常后这里的处理wrapUp(exitThrowable);} 注意NoOffsetForPartitionException和AuthenticationException | AuthorizationException其中能配置重试的是AuthenticationException | AuthorizationException异常就是授权失败可以通过配置过一段时间抢救一下一般而言kafka首次授权失败基本上就不太可能成功了但是这个只能控制consumer销毁producer还在重试所以出现了消费者销毁了发送者在kafka集群auth还原后成功恢复。看看wrapUp方法 既然知道了原理那么解决办法是配置 authExceptionRetryInterval解决方法 配置authExceptionRetryInterval也是不容易分析取值 private final Duration authExceptionRetryInterval this.containerProperties.getAuthExceptionRetryInterval(); 从 containerProperties来的实际上是继承org.springframework.kafka.listener.ConsumerProperties 在spring-kafka 2.8版本还对属性命名重构了毕竟以前的命名字母太多了 不过要修改这个值也不容易kafkaproperties并没有提供这个参数而且创建消费者容器工厂时 org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory 那么只能在 ConcurrentMessageListenerContainer 创建成功后从这个里面读取配置设置默认值。这就可以使用前面埋点的spring事件了通过事件拿到Consumer不就可以修改配置了 使用 publishConsumerStartedEvent(); 的事件最合适执行循环前最后一个事件而且这里的this就是消费者容器org.springframework.kafka.listener.KafkaMessageListenerContainer对象就是我们需要的 编写代码如下 Component public class KafkaConsumerStartedListener implements ApplicationListenerConsumerStartedEvent {Overridepublic void onApplicationEvent(ConsumerStartedEvent event) {KafkaMessageListenerContainer?, ? container event.getSource(KafkaMessageListenerContainer.class);container.getContainerProperties().setAuthExceptionRetryInterval(Duration.of(30, ChronoUnit.SECONDS));} } 就这样就解决了问题哈哈哈贼简单。  笔者在使用API时发现了还有一个API是授权失败后重启是布尔变量默认值是false即不重试。那么这个是哪里触发的呢在我们看到的消费者销毁事件里面实际上控制重试有多种办法一个是循环去kafka broker拉取一个是重启kafka消费者容器。 这个在上面已经说明了简单截个图再说明一下 设置的地方还是上面的代码里面同理也可以修改事件为 ConsumerStoppedEvent 可以更精准当然就用刚刚的代码加一行设置也是可以的。 总结 kafka在发送者和消费者是区分开的发送者如果连接kafka broker失败后可以一直重试直到成功但是消费者确有各种各样的逻辑可以精准控制比如消费者重启的配置 restartAfterAuthExceptions 可以控制消费者在停止时重启如果仅仅是授权失败而且不需要反复重启消耗资源那么可以通过 authExceptionRetryInterval 配置时间周期的方式实现但是kafka并没有给我们配置的入口但是kafka在消费者启动消费的过程埋了很多spring事件钩子通过这些钩子可以操作估计spring-kafka也不希望我们去修改毕竟消费者启动失败了或授权失败了消费者自动销毁是符合正常逻辑的。如果不使用kafka自己提供的事件可以在启动完成通过 org.springframework.kafka.config.KafkaListenerEndpointRegistry拿到所有消费者容器来批量设置属性毕竟spring-kafka也是通过这个端点注册器注册MessageListenerContainer的。
http://www.zqtcl.cn/news/392038/

相关文章:

  • 市面上有什么搭建网站工作室石家庄做网站和宣传的
  • 视频图站主题 wordpress快速收录提交入口
  • 外贸视频网站投资理财网站开发
  • 专业建设网站多少钱铜川网站seo
  • 海外网站seo优化wordpress的代码逻辑
  • 怎样帮别人做网站哪有网站给光头强做面
  • 聊城营销网站建设价格网站设计论文框架
  • 成都哪家网站建设做得好介绍自己的家乡遵义网站建设
  • 阳春新农村建设网站欣赏网站
  • 永久免费企业网站建设杭州个人做网站
  • 博罗中山网站建设做网站的软件 知乎
  • 广州网站开发广州亦客网络解答wordpress换空间要改
  • 丽水企业网站开发企业erp系统是什么软件
  • 好看的网站设计个人发布信息的免费平台
  • 电商网站业务流程linux上传中文wordpress
  • 广州网站定制商家外贸seo网站推广
  • 许昌大成建设集团网站wordpress自动博客插件
  • wordpress网站地图插件中国来料加工网
  • 黑龙江做网站的公司上海企业网站建设公
  • 做公众号时图片的网站安徽建设工程造价信息网站
  • 网站开发的在淘宝上是什么类目深圳做网站的大公司
  • 手机网站 html5信阳哪里做网站
  • 网站服务器多少钱一月wordpress 博客宠物
  • 怎么制作网站游戏辽宁建设工程网
  • 网站开发好还要空间吗网站支付链接怎么做的
  • 网站制作报价图片欣赏杭州做网站价格
  • 帮人家做家务的网站host绑定网站
  • 地方门户网站盈利模式这样做微信网站
  • 企业网站要怎么做wordpress w3
  • 网站备案帐号找回密码seo优化工作有哪些