兼职做商务标哪个网站,中国网站设计公司,网页编辑word文档,软件开发专业课程有哪些目录 RabbitMQ保证消息可靠性
生产者丢失消息
MQ丢失消息
消费端丢失了数据
Kakfa的消息可靠性
生产者的消息可靠性
Kakfa的消息可靠性
消费者的消息可靠性 RabbitMQ保证消息可靠性 生产者丢失消息
1.事务消息保证
生产者在发送消息之前#xff0c;开启事务消息随后生…目录 RabbitMQ保证消息可靠性
生产者丢失消息
MQ丢失消息
消费端丢失了数据
Kakfa的消息可靠性
生产者的消息可靠性
Kakfa的消息可靠性
消费者的消息可靠性 RabbitMQ保证消息可靠性 生产者丢失消息
1.事务消息保证
生产者在发送消息之前开启事务消息随后生产者发送消息消息发送之后如果消息没有被MQ接收到的话生产者会收到异常报错生产者回滚事务然后重试消息如果收到了消息就能提交事务了
Autowired
private RabbitTemplate rabbitTemplate;public void sendTransactionalMessage() {ConnectionFactory connectionFactory rabbitTemplate.getConnectionFactory();Channel channel connectionFactory.createConnection().createChannel(false);try {channel.txSelect(); // 开启事务channel.basicPublish(exchange, routing.key, null, message.getBytes());channel.txCommit(); // 提交事务} catch (Exception e) {channel.txRollback(); // 出错回滚}
}
2.使用confirm机制
普通confirm机制就是发送消息之后等待服务器confirm之后再发送下一个消息
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) - {if (ack) {System.out.println(消息成功发送到Broker);} else {System.out.println(消息发送失败原因 cause);}
});rabbitTemplate.convertAndSend(exchange, routing.key, message);
批量confirm机制每发送一批消息之后等待服务器confirm
Channel channel connection.createChannel(false);
channel.confirmSelect();for (int i 0; i 100; i) {channel.basicPublish(exchange, routing.key, null, (msg i).getBytes());
}
channel.waitForConfirms(); // 等待所有消息确认异步confirm机制服务器confirm一个或者多个消息之后客户端生产者能够通过回调函数来确定消息是否被confirm推荐
SortedSetLong pendingSet Collections.synchronizedSortedSet(new TreeSet());
channel.confirmSelect();channel.addConfirmListener(new ConfirmListener() {public void handleAck(long tag, boolean multiple) {if (multiple) pendingSet.headSet(tag 1).clear();else pendingSet.remove(tag);}public void handleNack(long tag, boolean multiple) {System.err.println(未确认消息 tag);if (multiple) pendingSet.headSet(tag 1).clear();else pendingSet.remove(tag);}
});while (true) {long seq channel.getNextPublishSeqNo();channel.basicPublish(demo.exchange, demo.key,MessageProperties.PERSISTENT_TEXT_PLAIN, hello.getBytes());pendingSet.add(seq);
}MQ丢失消息
防止MQ的丢失数据的话方法就是开启RabbitMQ的持久化消息写入之后也就是到了MQ之后就直接持久化到磁盘中即使Rabbimq自己挂了之后会恢复数据。
设置持久化步骤
创建queue的时候直接设置持久化此时就能持久化queue的元数据不是消息
Bean
public Queue durableQueue() {return new Queue(myQueue, true); // true 表示持久化
}发送消息的时候指定消息为deliveryMode设置为2也就是设置消息为持久化此时消息可以持久化磁盘上
MessageProperties props new MessageProperties();
props.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message new Message(content.getBytes(), props);
rabbitTemplate.send(exchange, routing.key, message);极端情况
消息写到RabbitMQ之后但是还没有持久化到磁盘之后直接挂了导致内存中消息丢失。
解决方法持久化与生产者的confirm机制配合当且仅当持久化了消息之后再confirm避免数据与消息丢失此时生产者收不到ack也是可以自己重发 消费端丢失了数据
意思就是消息已经拉取到了信息还没有处理注意这是已经告诉MQ我拉取到数据了结果进程挂了重启之后继续消费下一条消息导致中间的这一条没有消费到此时数据丢失了。
利用ack机制处理
取消RabbiMQ的自动ack也就是一个api可以在消费端消费完了消息之后再调用api告诉MQ我们收到并且处理了该消息。如果没有返回ackRabbitMQ会把该消息分配给其他的consumer处理消息不会丢失。通过配置处理
spring:rabbitmq:listener:simple:acknowledge-mode: manualKakfa的消息可靠性 生产者的消息可靠性
在kafka中可以在producer生产段设置一个参数也就是ackall要求每个数据必须写入所有的replica也就是所有该分区的副本才认为是接收成功。该参数设置的是你的leader接收到消息后所有的follower都同步到消息后才认为写成功
Properties props new Properties();
props.put(bootstrap.servers, localhost:9092);
props.put(acks, all); // 等待所有副本确认
props.put(retries, Integer.MAX_VALUE); // 无限重试
props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);
props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);KafkaProducerString, String producer new KafkaProducer(props);
ProducerRecordString, String record new ProducerRecord(my-topic, key, value);producer.send(record, (metadata, exception) - {if (exception null) {System.out.println(发送成功 metadata.offset());} else {exception.printStackTrace();}
});
producer.close();Kakfa的消息可靠性
kafka默认是会将消息持久化到磁盘上的但是还是有情况会导致丢失数据
kafka某个broker宕机随后重新选举partition的leader。倘若在该broker中的partition中的leader副本中的消息还没有被其他broker中的follower同步此时同步缺失的数据就丢失了也就是少了一些数据
解决方法
给 topic 设置 replication.factor 参数这个值必须大于 1要求每个 partition 必须有至少 2 个副本。在 Kafka 服务端设置 min.insync.replicas 参数这个值必须大于 1这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系。在 producer 端设置 acksall 这个是要求每条数据必须是写入所有 replica 之后才能认为是写成功了。在 producer 端设置 retriesMAX 很大很大很大的一个值无限次重试的意思这个是要求一旦写入失败就无限重试卡在这里了。
按照上面的配置之后leader的切换就不会导致数据缺失了。 消费者的消息可靠性
唯一可能也是类似于RabbitMQ中的也就是说你消费到该消息的时候消费者自动提交offset让kafka以为你消费好了该消息但是自己还没处理就宕机后会导致重启后没有消费该消息。
解决方法
关闭kafka默认的自动提交offset通过消费端业务逻辑处理完消息后再手动提交offset当然这里就是会导致重复消费了这里就是幂等性的问题了。比如你刚处理完还没提交 offset结果自己挂了此时肯定会重复消费一次
手动提交apiconsumer.commitSync();
Properties props new Properties();
props.put(bootstrap.servers, localhost:9092);
props.put(group.id, my-group);
props.put(enable.auto.commit, false); // 关闭自动提交
props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);
props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);KafkaConsumerString, String consumer new KafkaConsumer(props);
consumer.subscribe(Arrays.asList(my-topic));while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100));for (ConsumerRecordString, String record : records) {// 处理消息System.out.println(处理消息 record.value());}// 手动提交 offsetconsumer.commitSync();
}