当前位置: 首页 > news >正文

wordpress淘宝客建站教程视频wordpress弹出框插件

wordpress淘宝客建站教程视频,wordpress弹出框插件,定制礼品,网站建设方案书要怎么样写前言 Kafka发送消息是异步发送的#xff0c;所以我们不知道消息是否发送成功#xff0c;所以会可能造成消息丢失。而且Kafka架构是由生产者-服务器端-消费者三种组成部分构成的。要保证消息不丢失#xff0c;那么主要有三种解决方法#xff1a; 生产者#xff08;producer… 前言 Kafka发送消息是异步发送的所以我们不知道消息是否发送成功所以会可能造成消息丢失。而且Kafka架构是由生产者-服务器端-消费者三种组成部分构成的。要保证消息不丢失那么主要有三种解决方法 生产者producer保持同步发送消息服务器端broker持久化设置为同步刷盘消费者consumer设置为手动提交偏移量offset 1.生产者producer端 处理 生产者默认发送消息代码如下 import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties;public class KafkaMessageProducer {public static void main(String[] args) {// 配置Kafka生产者Properties props  new Properties();props.put(bootstrap.servers, localhost:9092); // Kafka集群地址props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer); // 键的序列化器props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer); // 值的序列化器// 创建Kafka生产者实例ProducerString, String producer  new KafkaProducer(props);String topic  my-topic; // Kafka主题try {// 发送消息到Kafkafor (int i  0; i  10; i) {String message  Message   i;ProducerRecordString, String record new ProducerRecord(topic, message);producer.send(record);System.out.println(Sent message:   message);}} catch (Exception e) {e.printStackTrace();} finally {// 关闭Kafka生产者producer.close();}} }请确保在运行代码之前已经设置好正确的Kafka集群地址、主题名称以及依赖的Kafka客户端库。该示例代码创建了一个Kafka生产者实例使用字符串作为键和值的序列化器并循环发送10条消息到指定的Kafka主题。 生产者端要保证消息发送成功可以有两个方法 把异步发送改成同步发送这样producer就能实时知道消息的发送结果。 要将 Kafka 发送方法改为同步发送可以使用 send() 方法的返回值FutureRecordMetadata 并调用 get() 方法来等待发送完成。 以下是将 Kafka 发送方法改为同步发送的示例代码 import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.producer.RecordMetadata;public class KafkaMessageProducer {public static void main(String[] args) {// 配置 Kafka 生产者Properties props  new Properties();props.put(bootstrap.servers, localhost:9092); // Kafka 集群地址props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer); // 键的序列化器props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer); // 值的序列化器// 创建 Kafka 生产者实例ProducerString, String producer  new KafkaProducer(props);String topic  my-topic; // Kafka 主题try {// 发送消息到 Kafkafor (int i  0; i  10; i) {String message  Message   i;ProducerRecordString, String record new ProducerRecord(topic, message);RecordMetadata metadata  producer.send(record).get(); // 同步发送并等待发送完成System.out.println(Sent message:   message  , offset:   metadata.offset());}} catch (InterruptedException | ExecutionException e) {e.printStackTrace();} finally {// 关闭 Kafka 生产者producer.close();}} }在这个示例代码中通过调用 send(record).get() 实现了同步发送其中 get() 方法会阻塞当前线程直到发送完成并返回消息的元数据。 添加异步回调函数来监听消息发送的结果如果发送失败可以在回调函数里重新发送。 要保持发送消息成功并添加回调函数你可以在发送消息的时候指定一个回调函数作为参数。回调 函数将在消息发送完成后被调用以便你可以在回调函数中处理发送结果。 以下是使用回调函数进行消息发送的示例代码 import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata;public class KafkaMessageProducer {public static void main(String[] args) {// 配置 Kafka 生产者Properties props  new Properties();props.put(bootstrap.servers, localhost:9092); // Kafka 集群地址props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer); // 键的序列化器props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer); // 值的序列化器// 创建 Kafka 生产者实例ProducerString, String producer  new KafkaProducer(props);String topic  my-topic; // Kafka 主题try {// 发送消息到 Kafkafor (int i  0; i  10; i) {String message  Message   i;ProducerRecordString, String record new ProducerRecord(topic, message);// 发送消息并指定回调函数producer.send(record, new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception  null) {System.out.println(Sent message:   message  , offset:   metadata.offset());} else {// 这里重新发送消息producer.send(record);exception.printStackTrace();}}});}} finally {// 关闭 Kafka 生产者producer.close();}} }在这个示例代码中我们使用了 send(record, callback) 方法来发送消息并传递了一个实现了 Callback 接口的匿名内部类作为回调函数。当消息发送完成后回调函数的 onCompletion() 方法会被调用。你可以根据 RecordMetadata 和 Exception 参数来处理发送结果。 另外producer还提供了一个重试参数这个参数叫retries如果因为网络问题或者Broker故障导致producer发送消息失败那么producer会根据这个参数的值进行重试发送消息。 2.服务器Broker端 处理 Kafka Broker服务器端通过以下方式来确保生产者端消息发送的成功和不丢失 1. 消息持久化异步刷盘Kafka Broker将接收到的消息持久化到磁盘上的日志文件中。这样即使在消息发送后发生故障Broker能够恢复并确保消息不会丢失。注意持久化是由操作系统调度的如果持久化之前系统崩溃了那么就因为不能持久化导致数据丢失但是Kafka没提供同步刷盘策略 2. 复制与高可用性Kafka支持分布式部署可以将消息分布到多个Broker上形成一个Broker集群。在集群中消息被复制到多个副本中以提供冗余和高可用性。生产者发送消息时它可以将消息发送到任何一个Broker然后Broker将确保消息在集群中的所有副本中都被复制成功。 3. 消息提交确认当生产者发送消息后在收到Broker的确认响应之前生产者会等待。如果消息成功写入并复制到了指定的副本中Broker会发送确认响应给生产者。如果生产者在指定的时间内没有收到确认响应它将会尝试重新发送消息以确保消息不会丢失。 4. 可靠性设置同步刷盘生产者可以配置一些参数来提高消息发送的可靠性。例如可以设置acks参数来指定需要收到多少个Broker的确认响应才认为消息发送成功。可以将acks设置为all表示需要收到所有副本的确认响应才算发送成功。 总之Kafka Broker通过持久化和复制机制以及消息确认和可靠性设置确保生产者端的消息发送成功且不丢失。同时应注意及时处理可能的错误情况并根据生产者端需求和场景合理配置相应的参数。 另外 参数 acks 是用来设置生产者在发送消息后等待确认响应的方式可以设置以下三个值之一 1. acks0生产者不会等待任何来自服务器的确认响应。消息被立即认为已发送成功但这也意味着如果服务器没有成功接收消息生产者将无法得知。这种设置下存在消息丢失的风险因此并不推荐在关键业务中使用。 2. acks1生产者在消息被写入服务器的leader副本后会收到一个确认响应。这意味着leader副本已收到消息并写入磁盘但其他副本尚未必需收到消息。这种设置下生产者可以获得基本的消息可靠性因为只要leader副本可达并写入成功生产者就会收到一个确认。 acksall或acks-1生产者在消息被写入服务器的所有leader副本后才会收到一个确认响应。这意味着所有副本都已成功接收并写入消息。这种设置下生产者可以获得最高级别的消息可靠性但会降低生产者的吞吐量因为需要等待更多的确认。 对于使用YAML文件进行Kafka配置的情况你可以按照以下格式设置acks参数 # Kafka生产者配置 producer:bootstrap.servers: your-kafka-server:9092acks: all        # 设置acks参数为allkey.serializer: org.apache.kafka.common.serialization.StringSerializervalue.serializer: org.apache.kafka.common.serialization.StringSerializer需要根据具体的业务需求来选择适当的acks值。对于关键业务建议使用acksall以确保消息的完全可靠性。对于一些非关键的应用轻微的消息丢失可能是可以接受的可以使用acks1来平衡可靠性和吞吐量。 3.消费者Concumer端 处理 Kafka Consumer 默认会确保消息的至少一次传递at least once delivery。这意味着当 Consumer 完成对一条消息的处理后会向 Kafka 提交消息的偏移量offset告知 Kafka 这条消息已被成功处理。如果 Consumer 在处理消息时发生错误可以通过回滚偏移量来重试处理之前的消息。 以下是一些确保消息消费成功的方法 1. 使用自动提交偏移量Auto Commit Offsets默认情况下Kafka Consumer 在消费消息后会自动提交偏移量。你可以通过设置 enable.auto.commit 属性为 false 来关闭自动提交然后在成功处理消息后手动提交偏移量。这样可以确保只有在消息成功处理后才提交偏移量以避免消息丢失。 2. 手动提交偏移量Manual Commit Offsets使用手动提交偏移量的方式可以更加精确地控制偏移量的提交时机。在成功处理消息后通过调用 commitSync() 或 commitAsync() 方法来手动提交偏移量。你可以针对每个分区或每批消息进行偏移量的提交以便在发生错误时能精确到达到处理过的最后一条消息。 3. 设置消费者的最大重试次数你可以在消费消息的处理逻辑中实现重试机制当处理失败时进行重试。可以使用一个计数器来限制重试次数以防止无限重试导致循环消费消息。 4. 设置适当的消费者参数根据你的需求你可以根据消息量、处理能力等因素来调整消费者的配置参数以确保消费者的性能和可靠性。例如可以适当增加消费者的并行度设置更多的线程或消费者实例来提高吞吐量和容错性。 记住尽管 Kafka 提供了可靠的消息传递机制但仍然需要在消费者端实现适当的错误处理和重试逻辑以处理可能发生的错误情况。 4.延申 Kafka 写入磁盘的日志文件主要用于持久化消息数据以确保数据的可靠性和持久性。下面是一些作用 1. 数据持久化Kafka使用日志文件来保存消息数据确保即使在发生故障或重启后数据也能够持久存储在磁盘上。这样可以有效地避免数据丢失。 2. 数据复制Kafka允许在不同的服务器之间进行数据复制以提高容错能力和可用性。写入磁盘的日志文件可以被复制到其他副本中以实现数据的冗余存储和故障恢复。 3. 数据回放Kafka的日志文件可以按顺序存储消息数据使得可以根据偏移量offset进行可靠的数据回放操作。消费者可以根据需要重新读取并处理存储在日志文件中的消息。 4. 顺序写入Kafka通过将消息追加到日志文件末尾的方式进行写入这种顺序写入的方式对于磁盘IO操作更为友好可以提高写入性能和吞吐量。 总之Kafka写入磁盘的日志文件可以确保消息数据的持久化、可靠性和顺序性提供高性能的消息传递和数据处理能力。 什么是ACK? ACK (Acknowledge character即是确认字符在数据通信中接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收无误。在TCP/IP协议中如果接收方成功的接收到数据那么会回复一个ACK数据。通常ACK信号有自己固定的格式,长度大小,由接收方回复给发送方。 什么是ISR? ISR全称是“In-Sync Replicas”也就是保持同步的副本他的含义就是跟Leader始终保持同步的Follower有哪些。所以每个Partition都有一个ISR这个ISR里一定会有Leader自己因为Leader肯定数据是最新的然后就是那些跟Leader保持同步的Follower也会在ISR里。Leader负责跟踪与维护ISR列表。如果一个 Follower 宕机或者落后太多落后多少由参数replica.lag.time.max.ms控制Leader 将把它从 ISR 中移除。如果Leader发生故障或挂掉一个新Leader被选举并被接受客户端的消息成功写入。Kafka确保从同步副本列表中选举一个副本为Leader新的Leader继续服务客户端的读写请求。 什么是HW HW俗称高水位是HighWatermark的缩写。它标识了一个特定的消息偏移量offset消费者只能拉取到这个offset之前的消息。 什么是LEO LEO Log End Offset标识当前日志文件中下一条待写入的消息的offset。LEO 的大小相当于当前日志分区中最后一条消息的offset值加1.分区 ISR 集合中的每个副本都会维护自身的 LEO 而 ISR 集合中最小的 LEO 即为分区的 HW对消费者而言只能消费 HW 之前的消息。
http://www.zqtcl.cn/news/147576/

相关文章:

  • 苏州网站公司排名前十最好看的视频免费下载
  • 快速设计一个网站wordpress4.9.6
  • 网站建立教学深圳宝安网站建设公司推荐
  • 深圳企业网站建设制作公司叶县红色家园网站建设
  • 网站制作报价被哪些因素影响建设银行官方网站首页个人登录
  • 免费网站怎么建谁能给个网站谢谢
  • 吴忠网站建设家里面的服务器可以做网站吗
  • 这是我自己做的网站做网站前台要学什么课程
  • 程序网站开发建设隔离变压器移动网站
  • 网站设置不发送消息怎么设置回来用typecho做的网站
  • 网站机房建设嵌入式培训机构哪家好
  • 购物网站页面设计图片网站 签约
  • 上海网站改版方案网站邮件设置
  • 如何在自己网站添加链接高端品牌logo图片
  • 网站建设找c宋南南app软件设计
  • 龙岗网站推广seo 0xu
  • 成都做网站微网站后台录入
  • 开发区网站建设山东房地产新闻
  • 手机如何搭建网站网站菜单导航
  • 网站建设丿金手指专业社交投票论坛网站开发
  • 做一套网站开发多少钱设计高端的国外网站
  • 有没有网站做lol网站的网页设计实验报告书
  • 网站后台域名重庆好的seo平台
  • 文化建设设计公司网站跨境电商亚马逊
  • 建设企业网站官网下载中心游戏网站开发设计报告
  • 外贸网站导航栏建设技巧专做奢侈品品牌的网站
  • 网站开发工程师资格证网站建设代理都有哪些
  • 汕头网站建设技术托管wordpress faq
  • 外贸网站建设系统能联系做仿瓷的网站
  • 阿里云网站域名绑定做网站的需要哪些职位