当前位置: 首页 > news >正文

兼职做商务标哪个网站中国网站设计公司

兼职做商务标哪个网站,中国网站设计公司,网页编辑word文档,软件开发专业课程有哪些目录 RabbitMQ保证消息可靠性 生产者丢失消息 MQ丢失消息 消费端丢失了数据 Kakfa的消息可靠性 生产者的消息可靠性 Kakfa的消息可靠性 消费者的消息可靠性 RabbitMQ保证消息可靠性 生产者丢失消息 1.事务消息保证 生产者在发送消息之前#xff0c;开启事务消息随后生…目录 RabbitMQ保证消息可靠性 生产者丢失消息 MQ丢失消息 消费端丢失了数据 Kakfa的消息可靠性 生产者的消息可靠性 Kakfa的消息可靠性 消费者的消息可靠性 RabbitMQ保证消息可靠性 生产者丢失消息 1.事务消息保证 生产者在发送消息之前开启事务消息随后生产者发送消息消息发送之后如果消息没有被MQ接收到的话生产者会收到异常报错生产者回滚事务然后重试消息如果收到了消息就能提交事务了 Autowired private RabbitTemplate rabbitTemplate;public void sendTransactionalMessage() {ConnectionFactory connectionFactory rabbitTemplate.getConnectionFactory();Channel channel connectionFactory.createConnection().createChannel(false);try {channel.txSelect(); // 开启事务channel.basicPublish(exchange, routing.key, null, message.getBytes());channel.txCommit(); // 提交事务} catch (Exception e) {channel.txRollback(); // 出错回滚} } 2.使用confirm机制 普通confirm机制就是发送消息之后等待服务器confirm之后再发送下一个消息 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) - {if (ack) {System.out.println(消息成功发送到Broker);} else {System.out.println(消息发送失败原因 cause);} });rabbitTemplate.convertAndSend(exchange, routing.key, message); 批量confirm机制每发送一批消息之后等待服务器confirm Channel channel connection.createChannel(false); channel.confirmSelect();for (int i 0; i 100; i) {channel.basicPublish(exchange, routing.key, null, (msg i).getBytes()); } channel.waitForConfirms(); // 等待所有消息确认异步confirm机制服务器confirm一个或者多个消息之后客户端生产者能够通过回调函数来确定消息是否被confirm推荐 SortedSetLong pendingSet Collections.synchronizedSortedSet(new TreeSet()); channel.confirmSelect();channel.addConfirmListener(new ConfirmListener() {public void handleAck(long tag, boolean multiple) {if (multiple) pendingSet.headSet(tag 1).clear();else pendingSet.remove(tag);}public void handleNack(long tag, boolean multiple) {System.err.println(未确认消息 tag);if (multiple) pendingSet.headSet(tag 1).clear();else pendingSet.remove(tag);} });while (true) {long seq channel.getNextPublishSeqNo();channel.basicPublish(demo.exchange, demo.key,MessageProperties.PERSISTENT_TEXT_PLAIN, hello.getBytes());pendingSet.add(seq); }MQ丢失消息 防止MQ的丢失数据的话方法就是开启RabbitMQ的持久化消息写入之后也就是到了MQ之后就直接持久化到磁盘中即使Rabbimq自己挂了之后会恢复数据。 设置持久化步骤 创建queue的时候直接设置持久化此时就能持久化queue的元数据不是消息 Bean public Queue durableQueue() {return new Queue(myQueue, true); // true 表示持久化 }发送消息的时候指定消息为deliveryMode设置为2也就是设置消息为持久化此时消息可以持久化磁盘上 MessageProperties props new MessageProperties(); props.setDeliveryMode(MessageDeliveryMode.PERSISTENT); Message message new Message(content.getBytes(), props); rabbitTemplate.send(exchange, routing.key, message);极端情况 消息写到RabbitMQ之后但是还没有持久化到磁盘之后直接挂了导致内存中消息丢失。 解决方法持久化与生产者的confirm机制配合当且仅当持久化了消息之后再confirm避免数据与消息丢失此时生产者收不到ack也是可以自己重发 消费端丢失了数据 意思就是消息已经拉取到了信息还没有处理注意这是已经告诉MQ我拉取到数据了结果进程挂了重启之后继续消费下一条消息导致中间的这一条没有消费到此时数据丢失了。 利用ack机制处理 取消RabbiMQ的自动ack也就是一个api可以在消费端消费完了消息之后再调用api告诉MQ我们收到并且处理了该消息。如果没有返回ackRabbitMQ会把该消息分配给其他的consumer处理消息不会丢失。通过配置处理 spring:rabbitmq:listener:simple:acknowledge-mode: manualKakfa的消息可靠性 生产者的消息可靠性 在kafka中可以在producer生产段设置一个参数也就是ackall要求每个数据必须写入所有的replica也就是所有该分区的副本才认为是接收成功。该参数设置的是你的leader接收到消息后所有的follower都同步到消息后才认为写成功 Properties props new Properties(); props.put(bootstrap.servers, localhost:9092); props.put(acks, all); // 等待所有副本确认 props.put(retries, Integer.MAX_VALUE); // 无限重试 props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer); props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);KafkaProducerString, String producer new KafkaProducer(props); ProducerRecordString, String record new ProducerRecord(my-topic, key, value);producer.send(record, (metadata, exception) - {if (exception null) {System.out.println(发送成功 metadata.offset());} else {exception.printStackTrace();} }); producer.close();Kakfa的消息可靠性 kafka默认是会将消息持久化到磁盘上的但是还是有情况会导致丢失数据 kafka某个broker宕机随后重新选举partition的leader。倘若在该broker中的partition中的leader副本中的消息还没有被其他broker中的follower同步此时同步缺失的数据就丢失了也就是少了一些数据 解决方法 给 topic 设置 replication.factor 参数这个值必须大于 1要求每个 partition 必须有至少 2 个副本。在 Kafka 服务端设置 min.insync.replicas 参数这个值必须大于 1这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系。在 producer 端设置 acksall 这个是要求每条数据必须是写入所有 replica 之后才能认为是写成功了。在 producer 端设置 retriesMAX 很大很大很大的一个值无限次重试的意思这个是要求一旦写入失败就无限重试卡在这里了。 按照上面的配置之后leader的切换就不会导致数据缺失了。 消费者的消息可靠性 唯一可能也是类似于RabbitMQ中的也就是说你消费到该消息的时候消费者自动提交offset让kafka以为你消费好了该消息但是自己还没处理就宕机后会导致重启后没有消费该消息。 解决方法 关闭kafka默认的自动提交offset通过消费端业务逻辑处理完消息后再手动提交offset当然这里就是会导致重复消费了这里就是幂等性的问题了。比如你刚处理完还没提交 offset结果自己挂了此时肯定会重复消费一次 手动提交apiconsumer.commitSync(); Properties props new Properties(); props.put(bootstrap.servers, localhost:9092); props.put(group.id, my-group); props.put(enable.auto.commit, false); // 关闭自动提交 props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer); props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);KafkaConsumerString, String consumer new KafkaConsumer(props); consumer.subscribe(Arrays.asList(my-topic));while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100));for (ConsumerRecordString, String record : records) {// 处理消息System.out.println(处理消息 record.value());}// 手动提交 offsetconsumer.commitSync(); }
http://www.zqtcl.cn/news/967445/

相关文章:

  • 新都区网站建设网站设计公司排行榜
  • 网站建设需求分析调研表建筑品牌网站
  • html5商城网站如何查询网站建设者
  • 做重视频网站教育网站改版方案
  • 小网站谁有网站上线后做什么
  • 松江网站建设培训手机网站你们
  • 荆州网站建设 众火网北京小客车指标调控管理信息系统
  • 域名和网站一样吗自己开发小程序要多少钱
  • 咨询公司网站源码手机优化软件哪个好用
  • 行业网站模板小型影视网站源码
  • 湖北网站建站系统哪家好微信小程序怎么注销账号
  • 温州网站推广公司沈阳网站建设服务电话
  • 2019年的阜南县建设修路网站洛阳哪里有做网站的
  • 家里电脑可以做网站服务器吗佛山网络公司哪家最好
  • 做网站属于无形资产还是费用网站制作二维码
  • ps为什么做不了视频网站最近做网站开发有前途没
  • 平面设计师参考网站做网站建设推广好做吗
  • 网站被别的域名绑定泰安做网站网络公司
  • 建设部网站业绩如何录入免费素材图片下载
  • 佛山美容网站建设如何有效的推广宣传
  • 网站全屏轮播怎么做nginx 代理 wordpress
  • 海淀公司网站搭建二级目录怎么做网站
  • 石家庄定制网站建设凡科建站做的网站收录慢吗
  • 海口企业自助建站品牌建设三年行动方案
  • 网站建设流程平台域名分析网站
  • 旅游类网站如何做推广随机网站生成器
  • 竖导航网站做网站被坑
  • 散文古诗网站建设目标做公司网站要钱吗
  • 营销网站建设规划小浪底水利枢纽建设管理局网站
  • 建站的目的网站的月度流量统计报告怎么做