当前位置: 首页 > news >正文

北京电商购物网站开发网站建设贵阳

北京电商购物网站开发,网站建设贵阳,wordpress设置角色,wordpress交易平台主题「Kafka」生产者篇 生产者发送消息流程 在消息发送的过程中#xff0c;涉及到了 两个线程 ——main 线程和Sender 线程。 在 main 线程中创建了 一个 双端队列 RecordAccumulator。 main线程将消息发送给RecordAccumulator#xff0c;Sender线程不断从 RecordAccumulator…「Kafka」生产者篇 生产者发送消息流程 在消息发送的过程中涉及到了 两个线程 ——main 线程和Sender 线程。 在 main 线程中创建了 一个 双端队列 RecordAccumulator。 main线程将消息发送给RecordAccumulatorSender线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。 main线程创建 Producer 对象调用 send 函数发送消息经过 拦截器 Interceptors可选项扩展一些额外功能序列化器 Serializer为什么不用Java的序列化因为大数据传输需要更轻量的序列化方式分区器 Partitioner需要判断发送到哪个分区 一个分区就会创建一个双端队列 RecordAccumulator创建队列都是在内存里完成的总大小默认为 32m。 双端队列 RecordAccumulator 还有一个内存池的概念每次 send 数据到队列后在存放数据的时候会从内存池中取出内存数据发送到kafka后释放内存归还到内存池一端创建内存另一端释放内存这也是它为什么设计为双端队列。 Sender线程从队列中拉取数据 每次批处理batch.size的大小默认为 16k延迟时间 linger.ms 默认为 0ms没有延迟。 这两个条件是 或 的关系两个条件达到任意一个就可以发送数据。 以节点的方式 key:value Broker1:(队列数据...) 的格式发送给对应的 kafka 服务器如果kafka没有应答默认每个broker节点队列最多缓存 5 个请求后续 生产经验—数据乱序 的章节会讲这个作用。 Selector负责打通底层的链路IO输入流 IO输出流经过Selector发送到kafka集群kafka集群进行副本的同步。如果kafka集群收到数据后会返回 ack有3种模式如上图。 如果ack返回成功则先清理掉缓存的Request请求然后清理到对应队列中的数据。如果ack返回失败则进行 retries 重试默认重试次数是int的最大值死磕一直发Request请求直到重试成功。详细讲解请参考下文的 生产经验—数据可靠性。 生产者重要参数列表 异步发送 同步发送外部数据发送到 RecordAccumulator 队列中等待这批数据都发送到 kafka 集群再返回。异步发送外部数据发送到 RecordAccumulator 队列中不管这些数据有没有发送到 kafka 集群直接返回。 默认为异步发送 普通异步发送 编写不带回调函数的代码 import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties;public class CustomProducer {public static void main(String[] args) throws InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties new Properties();// 2. 给 kafka 配置对象添加配置信息bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092);// key,value 序列化必须key.serializervalue.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);// 3. 创建 kafka 生产者对象KafkaProducerString, String kafkaProducer new KafkaProducer(properties);// 4. 调用 send 方法,发送消息for (int i 0; i 5; i) {// 这里只指定了topic和valuekafkaProducer.send(new ProducerRecord(first, atguigu i));}// 5. 关闭资源kafkaProducer.close();} }回调异步发送 回调函数会在 producer 收到 ack 时调用为异步调用该方法有两个参数分别是元数据信息RecordMetadata和异常信息Exception。 如果 Exception 为 null说明消息发送成功如果 Exception 不为 null说明消息发送失败。 注意消息发送失败会自动重试不需要我们在回调函数中手动重试。 // 4. 调用 send 方法,发送消息 for (int i 0; i 5; i) {// 添加回调 CallbackkafkaProducer.send(new ProducerRecord(first, atguigu i), new Callback() {// 该方法在 Producer 收到 ack 时调用为异步调用Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception null) {// 没有异常输出信息到控制台System.out.println(主题 metadata.topic() - 分区 metadata.partition());} else {// 出现异常打印exception.printStackTrace();}}});// 延迟一会会看到数据发往不同分区Thread.sleep(2); }同步发送 只需在异步发送的基础上再调用一下 get() 方法即可。 生产者分区 分区好处 可以通过机器的存储能力自定义分区数据比如 broker0 存储 20T 数据broker1和2分别存储 40T 数据。 生产者发送消息的分区策略 可阅读详解Kafka分区机制原理Kafka 系列 二 默认的分区器 DefaultPartitioner /*** The default partitioning strategy: 默认分区策略* 如果你指定了分区则直接用这个分区* 如果没指定分区但有key则按照key的hash值 % 分区数* 如果既没指定分区也没指定key则按照粘性分区处理。* See KIP-480 for details about sticky partitioning.*/ public class DefaultPartitioner implements Partitioner {... }ProducerRecord 类的构造方法就表示了这 3 种分区策略 自定义分区器 定义类实现 Partitioner 接口 重写 partition() 方法 import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map;/*** 1. 实现接口 Partitioner* 2. 实现3个方法: partition、close、configure* 3. 编写 partition 方法返回分区号*/ public class MyPartitioner implements Partitioner {/*** 返回信息对应的分区* param topic 主题* param key 消息的 key* param keyBytes 消息的 key 序列化后的字节数组* param value 消息的 value* param valueBytes 消息的 value 序列化后的字节数组* param cluster 集群元数据可以查看分区信息* return*/Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取消息String msgValue value.toString();// 创建 partitionint partition;// 判断消息是否包含 atguiguif (msgValue.contains(atguigu)) {partition 0;} else {partition 1;}// 返回分区号return partition;}// 关闭资源Overridepublic void close() {}// 配置方法Overridepublic void configure(MapString, ? configs) {} }使用分区器的方法在生产者的配置中添加分区器参数 import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties;public class CustomProducerCallbackPartitions {public static void main(String[] args) {Properties properties new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 添加自定义分区器properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, com.atguigu.kafka.producer.MyPartitioner);KafkaProducerString, String kafkaProducer new KafkaProducer(properties);for (int i 0; i 5; i) {kafkaProducer.send(new ProducerRecord(first, atguigu i), new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception e) {if (e null) {System.out.println(主题 metadata.topic() - 分区 metadata.partition());} else {e.printStackTrace();}}});}kafkaProducer.close();} }生产者如何提高吞吐量 合理调整 batch.size 和 linger.ms 的参数值采用数据压缩调整缓冲区大小 import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties;public class CustomProducerParameters {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties new Properties();// 2. 给 kafka 配置对象添加配置信息bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092);// key,value 序列化必须key.serializervalue.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// batch.size批次大小默认 16Kproperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// linger.ms等待时间默认 0msproperties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// RecordAccumulator缓冲区大小默认 32Mbuffer.memoryproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// compression.type压缩默认 none可配置值 gzip、snappy、lz4 和 zstdproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, snappy);// 3. 创建 kafka 生产者对象KafkaProducerString, String kafkaProducer new KafkaProducer(properties);// 4. 调用 send 方法,发送消息for (int i 0; i 5; i) {kafkaProducer.send(new ProducerRecord(first, atguigu i));}// 5. 关闭资源kafkaProducer.close();} }生产经验—数据可靠性 回顾发送流程 数据可靠性主要根据 kafka 集群返回给我们的 ack。 ack 应答原理 ack0不需要等待数据落盘应答一直发送给 kafka很容易丢数据。 数据发送到 Leader 后Leader 挂掉了此时数据还在内存中未落盘数据丢失。 ack1不需要等待 kafka 主从同步完成Leader 收到数据落盘后应答。 Leader 成功落盘但还未同步给 FollowerLeader 挂了数据丢失。 ack-1需要等待 Leader 和 ISR 队列里面的所有节点收齐数据后应答。 数据完全可靠条件 数据完全可靠条件 ACK 级别设置为-1 分区副本大于等于2 ISR里应答的最小副本数量大于等于2 注意这里的“副本”并不是指的 Follower在 Kafka 中副本分为 Leader 副本和 Follower 副本。Leader 副本负责处理消息而 Follower 副本则简单地复制 Leader 副本的数据。 也就是一个分区至少要有 1 个 Leader 和 1 个 FollowerISR 队列最少也要有 1 个 Leader 和 1 个 Follower。 一个分区至少有 1 个 Leader所以每个 Partition 都会有一个 ISR而且是由 Leader 动态维护。 可靠性总结 acks0生产者发送过来数据就不管了可靠性差效率高acks1生产者发送过来数据 Leader 应答可靠性中等效率中等acks-1生产者发送过来数据 Leader 和 ISR 队列里面所有 Follwer 应答可靠性高效率低在生产环境中 acks0很少使用acks1一般用于传输普通日志允许丢个别数据acks-1一般用于传输和钱相关的数据对可靠性要求比较高的场景。 代码实现 // 设置 acks-1 properties.put(ProducerConfig.ACKS_CONFIG, all); // 重试次数 retries默认是 int 最大值2147483647 properties.put(ProducerConfig.RETRIES_CONFIG, 3);拓展 生产者将数据发送给 Leader并且完成同步给 Follower此时回复 ack 时Leader 挂了kafka 会挑一个 Follower 成为新的 Leader因为生产者没有收到 ack此时就会认为他的数据没有发送到 kafka就会进行重试导致新 Leader 重复接收了两份数据。 生产经验—数据去重 数据传递语义 幂等性 幂等性原理 如何使用幂等性 开启参数 enable.idempotence默认为 true默认开启。 生产者事务 幂等性只能保证单分区单会话的不重复一旦 kafka 挂掉重启还是有可能产生重复数据。如果想完全去重就必须使用事务。 Kafka 事务原理 幂等性如果 kafka 挂掉重启会重新生成一个 PID所以可能会有重复。事务kafka 根据全局唯一的 transactional.id 会划分到50个分区中的某一个分区这些分区的信息是存储在一个特殊 Topic 里的而 Topic 的底层就是硬盘所以即使客户端挂掉了重启后也能继续处理未完成的事务因为有 transactional.id 存在。 Kafka 的事务一共有如下 5 个 API // 1. 初始化事务 void initTransactions();// 2. 开启事务 void beginTransaction() throws ProducerFencedException;// 3. 在事务内提交已经消费的偏移量主要用于消费者 void sendOffsetsToTransaction(MapTopicPartition, OffsetAndMetadata offsets, String consumerGroupId) throws ProducerFencedException;// 4. 提交事务 void commitTransaction() throws ProducerFencedException;// 5. 放弃事务类似于回滚事务的操作 void abortTransaction() throws ProducerFencedException;单个 Producer使用事务保证消息的仅一次发送 import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties;public class CustomProducerTransactions {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092);// key,value 序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置事务 id必须事务 id 任意起名要求全局唯一properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transaction_id_0);// 3. 创建 kafka 生产者对象KafkaProducerString, String kafkaProducer new KafkaProducer(properties);// 初始化事务kafkaProducer.initTransactions();// 开启事务kafkaProducer.beginTransaction();try {// 4. 调用 send 方法,发送消息for (int i 0; i 5; i) {// 发送消息kafkaProducer.send(new ProducerRecord(first, atguigu i));}// int i 1 / 0;// 提交事务kafkaProducer.commitTransaction();} catch (Exception e) {// 终止事务kafkaProducer.abortTransaction();} finally {// 5. 关闭资源kafkaProducer.close();}} }生产经验—数据有序 仅能保证单分区内有序如果想保证全局有序只能把所有分区的消息都拉到消费者端进行一个全排序再进行消费。 但需要等所有数据到齐了再进行排序效率可能还不如单分区。 生产经验—数据乱序 一个 broker 可以有一个 broker 缓存队列队列中存放的是还未收到 ack 的请求最多能存放 5 个。 比如发送 Request1 后对方没有应答此时还可以发送 Request2、Request3、Request4、Request5最多能发送 5 次请求。 假设在一个分区中生产者发送了 Request1、Request2 请求都成功了但 Request3 请求发送失败了进行重试但此时 Request4 请求发送成功了然后 Request3 请求才发送成功此时到达 kafka 的顺序就为 1 2 4 3是乱序的。 kafka在1.x版本之前保证数据单分区有序条件如下 max.in.flight.requests.per.connection1不需要考虑是否开启幂等性。 也就是 broker 的缓存队列只允许有 1 个请求这个请求收到 ack 后才能发送下一个。 kafka在1.x及以后版本保证数据单分区有序条件如下 开启幂等性 max.in.flight.requests.per.connection 需要设置小于等于 5。 未开启幂等性 max.in.flight.requests.per.connection 需要设置为 1和kafka在1.x版本之前一样。 原因说明因为在 kafka1.x 以后启用幂等后kafka 服务端最多会缓存 producer 发来的最近 5 个 request 的元数据。 故无论如何都可以保证最近 5 个 request 的数据都是有序的。 比如先来的 Request1、Request2服务端根据 SeqNumber 判断数据是否是单调递增的如果符合则直接进行落盘但下一个请求是 Request4正常应该是 Request3所以 Request4 这个请求只能在内存中放着不能进行落盘再下一个是 Request5同样不能进行落盘直到 Request3 来了然后对他们进行排序然后再依次落盘 Request3、Request4、Request5。 笔记整理自b站尚硅谷视频教程【尚硅谷】Kafka3.x教程从入门到调优深入全面
http://www.zqtcl.cn/news/38417/

相关文章:

  • SEO网站链接模型32岁学做网站
  • 中文设计网站p2p金融网站开发
  • 网站建设排行公司个人简历模板下载 免费
  • 什么网站可以做长图编程软件scratch免费下载
  • 做数据结构基础的网站网线制作工具
  • 中国建筑人才招聘网站seo 规范
  • 建网站要多少钱呢泉州建设网站开发
  • 英文网站模板cmsapp服务器搭建教程
  • 网站服务器租用多少钱才合理呢什么是关键词
  • 学科网站建设seo关键词快速提升软件官网
  • 好的网页网站设计陕西住建执业证书官网
  • 什么样的网站不备案嘉兴中元建设网站
  • 网站开发与维护的内容郑州建网站价格
  • 网站如何做付费网站模板下载软件
  • 肖港网站开发做网站练手
  • 做什么网站开发好中企动力总部在哪
  • 手机网站建设视频教程郑州网站建设 郑州网站设计
  • 在一个空间建两个网站wordpress extended rss
  • 做自媒体的素材网站python做的网站多吗
  • 做平行进口的汽车网站注册网站的软件
  • 厦门 网站 开发企业网站建设数据现状分析
  • vs做的网站项目可以改名字吗陕西交通建设集团公司网站
  • 手机网站头部图片怎么做西宁做网站君博示范
  • 网站改版公告一键做网站
  • 成都医院网站建设用dw做的网站怎么上传图片
  • dede网站地图htmlwordpress段落缩进
  • 0基础学做网站教程门户网站都有哪些
  • 化妆品网站建设实训总结重庆市建设政务中心网站
  • 重庆网站制作定制网站建设收获
  • 云南住房和城乡建设厅网站首页郑州发布会最新消息