英文版企业网站布局设计,dw做的网站如何用手机看,做淘宝设计能做网站吗,互联网创业怎么起步Producer消息发送
producer.send(msg); // 用类似这样的方式去发送消息#xff0c;就会把消息给你均匀的分布到各个分区上去 producer.send(key, msg); // 订单id#xff0c;或者是用户id#xff0c;他会根据这个key的hash值去分发到某个分区上去#xff0c;他可以保证相同…Producer消息发送
producer.send(msg); // 用类似这样的方式去发送消息就会把消息给你均匀的分布到各个分区上去 producer.send(key, msg); // 订单id或者是用户id他会根据这个key的hash值去分发到某个分区上去他可以保证相同的key会路由分发到同一个分区上去。 每次发送消息都必须先把数据封装成一个ProducerRecord对象里面包含了要发送的topic具体在哪个分区分区key消息内容timestamp时间戳然后这个对象交给序列化器变成自定义协议格式的数据接着把数据交给partitioner分区器对这个数据选择合适的分区默认就轮询所有分区或者根据key来hash路由到某个分区这个topic的分区信息都是在客户端会有缓存的当然会提前跟broker去获取。接着这个数据会被发送到producer内部的一块缓冲区里然后producer内部有一个Sender线程会从缓冲区里提取消息封装成一个一个的batch然后每个batch发送给分区的leader副本所在的broker。
常见异常处理 常见的异常如下
1LeaderNotAvailableException某台机器挂了此时leader副本不可用会导致你写入失败要等待其他follower副本切换为leader副本之后才能继续写入此时可以重试发送即可。 2NotControllerException这个也是同理如果说Controller所在Broker挂了那么此时会有问题需要等待Controller重新选举此时也是一样就是重试即可 3NetworkException网络异常重试即可 参数retries 默认值是3 参数retry.backoff.ms 两次重试之间的时间间隔
提升消息吞吐量
1buffer.memory设置发送消息的缓冲区默认值是33554432就是32MB 如果发送消息出去的速度小于写入消息进去的速度就会导致缓冲区写满此时生产消息就会阻塞住所以说这里就应该多做一些压测尽可能保证说这块缓冲区不会被写满导致生产行为被阻塞住 Long startTimeSystem.currentTime();producer.send(record, new Callback() { Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if(exception null) {// 消息发送成功System.out.println(消息发送成功); } else {// 消息发送失败需要重新发送}}});Long endTimeSystem.currentTime();If(endTime - startTime 100){//说明内存被压满了说明有问题}
2compression.type默认是none不压缩但是也可以使用lz4压缩效率还是不错的压缩之后可以减小数据量提升吞吐量但是会加大producer端的cpu开销 3batch.size设置meigebatch的大小如果batch太小会导致频繁网络请求吞吐量下降如果batch太大会导致一条消息需要等待很久才能被发送出去而且会让内存缓冲区有很大压力过多数据缓冲在内存里 默认值是16384就是16kb也就是一个batch满了16kb就发送出去一般在实际生产环境这个batch的值可以增大一些来提升吞吐量可以自己压测一下 4linger.ms这个值默认是0意思就是消息必须立即被发送但是这是不对的一般设置一个100毫秒之类的这样的话就是说这个消息被发送出去后进入一个batch如果100毫秒内这个batch满了16kb自然就会发送出去。但是如果100毫秒内batch没满那么也必须把消息发送出去了不能让消息的发送延迟时间太长也避免给内存造成过大的一个压力。
请求超时 1max.request.size这个参数用来控制发送出去的消息的大小默认是1048576字节也就1mb这个一般太小了很多消息可能都会超过1mb的大小所以需要自己优化调整把他设置更大一些企业一般设置成10M 2request.timeout.ms这个就是说发送一个请求出去之后他有一个超时的时间限制默认是30秒如果30秒都收不到响应那么就会认为异常会抛出一个TimeoutException来让我们进行处理
ACK参数 acks参数其实是控制发送出去的消息的持久化机制的 1如果acks0那么producer根本不管写入broker的消息到底成功没有发送一条消息出去立马就可以发送下一条消息这是吞吐量最高的方式但是可能消息都丢失了你也不知道的但是说实话你如果真是那种实时数据流分析的业务和场景就是仅仅分析一些数据报表丢几条数据影响不大的。会让你的发送吞吐量会提升很多你发送弄一个batch出不需要等待人家leader写成功直接就可以发送下一个batch了吞吐量很大的哪怕是偶尔丢一点点数据实时报表折线图饼图。
2acksall或者acks-1这个leader写入成功以后必须等待其他ISR中的副本都写入成功才可以返回响应说这条消息写入成功了此时你会收到一个回调通知
3acks1只要leader写入成功就认为消息成功了默认给这个其实就比较合适的还是可能会导致数据丢失的如果刚写入leaderleader就挂了此时数据必然丢了其他的follower没收到数据副本变成leader
如果要想保证数据不丢失得如下设置 a)min.insync.replicas 2ISR里必须有2个副本一个leader和一个follower最最起码的一个不能只有一个leader存活连一个follower都没有了
b)acks -1每次写成功一定是leader和follower都成功才可以算做成功leader挂了follower上是一定有这条数据不会丢失
c) retries Integer.MAX_VALUE无限重试如果上述两个条件不满足写入一直失败就会无限次重试保证说数据必须成功的发送给两个副本如果做不到就不停的重试除非是面向金融级的场景面向企业大客户或者是广告计费跟钱的计算相关的场景下才会通过严格配置保证数据绝对不丢失
重试乱序 消息重试是可能导致消息的乱序的因为可能排在你后面的消息都发送出去了你现在收到回调失败了才在重试此时消息就会乱序所以可以使用“max.in.flight.requests.per.connection”参数设置为1这样可以保证producer同一时间只能发送一条消息
Consumer架构
Offset管理
每个consumer内存里数据结构保存对每个topic的每个分区的消费offset定期会提交offset老版本是写入zk但是那样高并发请求zk是不合理的架构设计zk是做分布式系统的协调的轻量级的元数据存储不能负责高并发读写作为数据存储。所以后来就是提交offset发送给内部topic__consumer_offsets提交过去的时候key是group.idtopic分区号value就是当前offset的值每隔一段时间kafka内部会对这个topic进行compact。也就是每个group.idtopic分区号就保留最新的那条数据即可。而且因为这个 __consumer_offsets可能会接收高并发的请求所以默认分区50个这样如果你的kafka部署了一个大的集群比如有50台机器就可以用50台机器来抗offset提交的请求压力就好很多。
Coordinator
Coordinator的作用 每个consumer group都会选择一个broker作为自己的coordinator他是负责监控这个消费组里的各个消费者的心跳以及判断是否宕机然后开启rebalance 根据内部的一个选择机制会挑选一个对应的BrokerKafka总会把你的各个消费组均匀分配给各个Broker作为coordinator来进行管理的consumer group中的每个consumer刚刚启动就会跟选举出来的这个consumer group对应的coordinator所在的broker进行通信然后由coordinator分配分区给你的这个consumer来进行消费。coordinator会尽可能均匀的分配分区给各个consumer来消费。 如何选择哪台是coordinator 首先对消费组的groupId进行hash接着对__consumer_offsets的分区数量取模默认是50可以通过offsets.topic.num.partitions来设置找到你的这个consumer group的offset要提交到__consumer_offsets的哪个分区。比如说groupId“membership-consumer-group” - hash值数字- 对50取模 - 就知道这个consumer group下的所有的消费者提交offset的时候是往哪个分区去提交offset找到__consumer_offsets的一个分区__consumer_offset的分区的副本数量默认来说1只有一个leader然后对这个分区找到对应的leader所在的broker这个broker就是这个consumer group的coordinator了consumer接着就会维护一个Socket连接跟这个Broker进行通信。
Rebalance策略
比如我们消费的一个主题有12个分区 p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11 假设我们的消费者组里面有三个消费者 1.range策略 range策略就是按照partiton的序号范围 p0~3 consumer1 p4~7 consumer2 p8~11 consumer3 默认就是这个策略
2.round-robin策略 consumer1:0,3,6,9 consumer2:1,4,7,10 consumer3:2,5,8,11
但是前面的这两个方案有个问题 假设consuemr1挂了:p0-5分配给consumer2,p6-11分配给consumer3 这样的话原本在consumer2上的的p6,p7分区就被分配到了 consumer3上。
3.sticky策略 最新的一个sticky策略就是说尽可能保证在rebalance的时候让原本属于这个consumer 的分区还是属于他们 然后把多余的分区再均匀分配过去这样尽可能维持原来的分区分配的策略
consumer10-3 consumer2: 4-7 consumer3: 8-11 假设consumer3挂了 consumer10-38,9 consumer2: 4-710,11
Rebalance分代机制 在rebalance的时候可能你本来消费了partition3的数据结果有些数据消费了还没提交offset结果此时rebalance把partition3分配给了另外一个cnosumer了此时你如果提交partition3的数据的offset能行吗必然不行所以每次rebalance会触发一次consumer group generation分代每次分代会加1然后你提交上一个分代的offset是不行的那个partiton可能已经不属于你了大家全部按照新的partiton分配方案重新消费数据。
Consumer核心参数
【heartbeat.interval.ms】 consumer心跳时间必须得保持心跳才能知道consumer是否故障了然后如果故障之后就会通过心跳下发rebalance的指令给其他的consumer通知他们进行rebalance的操作
【session.timeout.ms】 kafka多长时间感知不到一个consumer就认为他故障了默认是10秒
【max.poll.interval.ms】 如果在两次poll操作之间超过了这个时间那么就会认为这个consume处理能力太弱了会被踢出消费组分区分配给别人去消费一遍来说结合你自己的业务处理的性能来设置就可以了
【fetch.max.bytes】 获取一条消息最大的字节数一般建议设置大一些
【max.poll.records】 一次poll返回消息的最大条数默认是500条
【connection.max.idle.ms】 consumer跟broker的socket连接如果空闲超过了一定的时间此时就会自动回收连接但是下次消费就要重新建立socket连接这个建议设置为-1不要去回收
【auto.offset.reset】 earliest 当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时从头开始消费 topica - partition0:1000 partitino1:2000 latest 当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时从当前位置开始消费 none topic各分区都存在已提交的offset时从offset后开始消费只要有一个分区不存在已提交的offset则抛出异常
【enable.auto.commit】 这个就是开启自动提交唯一
【auto.commit.ineterval.ms】 这个指的是多久条件一次偏移量