当前位置: 首页 > news >正文

天猫注册店铺流程及费用赣榆网站建设xxiaoseo

天猫注册店铺流程及费用,赣榆网站建设xxiaoseo,视频剪辑培训,企业域名注册官方网址 源码#xff1a;https://kafka.apache.org/downloads 快速开始#xff1a;https://kafka.apache.org/documentation/#gettingStarted springcloud整合 发送消息流程 主线程#xff1a;主线程只负责组织消息#xff0c;如果是同步发送会阻塞#xff0c;如果是异…官方网址 源码https://kafka.apache.org/downloads 快速开始https://kafka.apache.org/documentation/#gettingStarted springcloud整合 发送消息流程 主线程主线程只负责组织消息如果是同步发送会阻塞如果是异步发送需要传入一个回调函数。 Map集合存储了主线程的消息。 Sender线程真正的发送其实是sender去发送到broker中。 源码阅读 1 首先打开Producer.send()可以看到里面的内容 // 返回值是一个 Future 参数为ProducerRecord FutureRecordMetadata send(ProducerRecordK, V record);// ProducerRecord定义了这些信息 // 主题 private final String topic; // 分区 private final Integer partition; // header private final Headers headers; private final K key; private final V value; // 时间戳 private final Long timestamp;2 发送之前的前置处理 public FutureRecordMetadata send(ProducerRecordK, V record, Callback callback) {// intercept the record, which can be potentially modified; this method does not throw exceptions// 这里给开发者提供了前置处理的勾子ProducerRecordK, V interceptedRecord this.interceptors.onSend(record);// 我们最终发送的是经过处理后的消息 并且如果是异步发送会有callback 这个是用户定义的return doSend(interceptedRecord, callback);}3 进入真正的发送逻辑Future doSend() 由于是网络通信所以我们要序列化在这个函数里面就做了序列化的内容。 try {serializedKey keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException cce) {throw new SerializationException(Cant convert key of class record.key().getClass().getName() to class producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() specified in key.serializer, cce);}byte[] serializedValue;try {serializedValue valueSerializer.serialize(record.topic(), record.headers(), record.value());} catch (ClassCastException cce) {throw new SerializationException(Cant convert value of class record.value().getClass().getName() to class producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() specified in value.serializer, cce);}然后我们获取分区 // 然后这里又是一个策略者模式 也是由用户可以配置的 DefaultPartitioner UniformStickyPartitioner RoundRobinPartitioner 提供了这样三个分区器 private int partition(ProducerRecordK, V record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {Integer partition record.partition();return partition ! null ?partition :partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }4 到了我们的RecordAccumulator也就是先由主线程发送到了RecordAccumulator // 也就是对图中的Map集合 RecordAccumulator.RecordAppendResult result accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);我们发现里面是用一个MAP存储的一个分区和ProducerBatch 是讲这个消息写到内存里面MemoryRecordsBuilder 通过这个进行写入 // 可以看到是一个链表实现的双向队列也就是消息会按append的顺序写到 内存记录中去 private final ConcurrentMapTopicPartition, DequeProducerBatch batches;5 接着我们看,我们append了以后会有一个判断去唤醒sender线程见下面的注释 // 如果说哦我们当前的 这个batch满了或者 我们创建了一个新的batch 这个时候唤醒 sender线程去发送数据 if (result.batchIsFull || result.newBatchCreated) {log.trace(Waking up the sender since topic {} partition {} is either full or getting a new batch, record.topic(), partition);// 唤醒sender 去发送数据this.sender.wakeup();}// 实现了Runnable 所以我们去看一下RUN方法的逻辑 public class Sender implements Runnable 好上来就是一个循环 while (running) {try {runOnce();} catch (Exception e) {log.error(Uncaught error in kafka producer I/O thread: , e);} }接着进入runOnece方法直接看核心逻辑 // 从RecordAccumulator 拿数据 然后发送 MapInteger, ListProducerBatch batches this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);addToInflightBatches(batches); // 中间省去了非核心逻辑 sendProduceRequests(batches, now);如果继续跟踪的话最终是走到了selector.send()里面 Send send request.toSend(destination, header);InFlightRequest inFlightRequest new InFlightRequest(clientRequest,header,isInternalRequest,request,send,now);this.inFlightRequests.add(inFlightRequest);selector.send(send);6 接着我们就要看返回逻辑了,可以看到在sendRequest里面sendProduceRequest方法是通过传入了一个回调函数处理返回的。 RequestCompletionHandler callback new RequestCompletionHandler() {public void onComplete(ClientResponse response) {handleProduceResponse(response, recordsByPartition, time.milliseconds());}};// 如果有返回 if (response.hasResponse()) {ProduceResponse produceResponse (ProduceResponse) response.responseBody();for (Map.EntryTopicPartition, ProduceResponse.PartitionResponse entry : produceResponse.responses().entrySet()) {TopicPartition tp entry.getKey();ProduceResponse.PartitionResponse partResp entry.getValue();ProducerBatch batch batches.get(tp);completeBatch(batch, partResp, correlationId, now, receivedTimeMs produceResponse.throttleTimeMs());}this.sensors.recordLatency(response.destination(), response.requestLatencyMs());} 追踪到ProducerBatch if (this.finalState.compareAndSet(null, tryFinalState)) {completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);return true;}private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {// Set the future before invoking the callbacks as we rely on its state for the onCompletion callproduceFuture.set(baseOffset, logAppendTime, exception);// execute callbacksfor (Thunk thunk : thunks) {try {if (exception null) {RecordMetadata metadata thunk.future.value();if (thunk.callback ! null)thunk.callback.onCompletion(metadata, null);} else {if (thunk.callback ! null)thunk.callback.onCompletion(null, exception);}} catch (Exception e) {log.error(Error executing user-provided callback on message for topic-partition {}, topicPartition, e);}}produceFuture.done();}Thunk 这个其实就是我们在Append的时候的回调 至此整个流程就完成了从发送消息到响应后回调我们的函数。 消息可靠性 // 所有消费者的配置都在ProducerConfig 里面 public static final String ACKS_CONFIG acks;acks 0异步形式单向发送不会等待 broker 的响应 acks 1主分区保存成功然后就响应了客户端并不保证所有的副本分区保存成功 acks all 或 -1等待 broker 的响应然后 broker 等待副本分区的响应总之数据落地到所有的分区后才能给到producer 一个响应 在可靠性的保证下假设一些故障 Broker 收到消息后同步 ISR 异常只要在 -1 的情况下其实不会造成消息的丢失因为有重试机制Broker 收到消息并同步 ISR 成功但是响应超时只要在 -1 的情况下其实不会造成消息的丢失因为有重试机制 可靠性能保证哪些不能保障哪些 保证了消息不会丢失不保证消息一定不会重复消息有重复的概率需要消费者有幂等性控制机制
http://www.zqtcl.cn/news/138687/

相关文章:

  • 柯桥区网站建设湖南人文科技学院
  • 建设一个网站需要哪些福田企业网站推广哪个好
  • 网站外链建设的15个小技巧中国农业建设中心网站
  • 交易平台网站怎么做wordpress 置顶 函数
  • 义乌市场官方网站jsp做就业网站
  • 推荐网站在线看兄弟们企业概况简介
  • 软装设计方案网站网站制作排名优化
  • 网站前端模板专业建站报价
  • 站长工具星空传媒怎么做游戏网站编辑
  • 大兴手机网站建设深圳小程序开发公司
  • c 大型网站开发案例电销系统线路
  • 鸿扬家装网站建设谈谈对seo的理解
  • 七米网站建设做网站也分内存大小的吗
  • 丝足网站的建设南宁关键词排名公司
  • 上饶商城网站建设亚马逊海外购官方网
  • 做网站代理商好赚吗高端品牌男鞋有哪些
  • 农产品网站建设及优化项目商务网站建设 视频
  • 北京兼职做网站建设百度app平台
  • 网站建设头部代码网站怎么做咨询
  • 网站运营 网站建设北京公司网站制作要多少钱
  • 郑州看妇科最好的医院是哪里南宁百度seo软件
  • 深圳市住房与建设局实名制网站手机网站打不开被拦截怎么办
  • 公司做网站的价格几千元wordpress 修改页脚
  • 专业网站建设公司在线咨询宁波网站推广公司价格
  • 网站搭建系统都有哪些丽水网站开发
  • 网站设计包含哪些技术外行怎么做网站
  • 网站建设运营知识推广软文平台
  • 营销型网站建设用途网站 文件夹结构
  • 制作网站建设策划方案cosy主题wordpress
  • 网站建设服务联享科技net和cn哪个做网站好