当前位置: 首页 > news >正文

营销型网站设计房地产wordpress多媒体路径

营销型网站设计房地产,wordpress多媒体路径,深圳注册公司地址可以是住宅吗,安庆建设机械网站Storm 是一个分布式的实时计算框架#xff0c;可以很方便地对流式数据进行实时处理和分析#xff0c;能运用在实时分析、在线数据挖掘、持续计算以及分布式 RPC 等场景下。Storm 的实时性可以使得数据从收集到处理展示在秒级别内完成#xff0c;从而为业务方决策提供实时的数… Storm 是一个分布式的实时计算框架可以很方便地对流式数据进行实时处理和分析能运用在实时分析、在线数据挖掘、持续计算以及分布式 RPC 等场景下。Storm 的实时性可以使得数据从收集到处理展示在秒级别内完成从而为业务方决策提供实时的数据支持。 在美团点评公司内部实时计算主要应用场景包括实时日志解析、用户行为分析、实时消息推送、消费趋势展示、实时新客判断、实时活跃用户数统计等。这些数据提供给各事业群并作为他们实时决策的有力依据弥补了离线计算“T1”的不足。 在实时计算中用户不仅仅关心时效性的问题同时也关心消息处理的成功率。本文将通过实验验证 Storm 的消息可靠性保证机制文章分为消息保证机制、测试目的、测试环境、测试场景以及总结等五节。 Storm 的消息保证机制 Storm 提供了三种不同层次的消息保证机制分别是 At Most Once、At Least Once 以及 Exactly Once。消息保证机制依赖于消息是否被完全处理。 消息完全处理 每个从 SpoutStorm 中数据源节点发出的 TupleStorm 中的最小消息单元可能会生成成千上万个新的 Tuple形成一棵 Tuple 树当整棵 Tuple 树的节点都被成功处理了我们就说从 Spout 发出的 Tuple 被完全处理了。 我们可以通过下面的例子来更好地诠释消息被完全处理这个概念 TopologyBuilder builder new TopologyBuilder(); builder.setSpout(sentences, new KafkaSpout(spoutConfig), spoutNum); builder.setBolt(split, new SplitSentence(), 10).shuffleGrouping(sentences); builder.setBolt(count, new WordCount(), 20).fieldsGrouping(split, new Fields(word));这个 Topology 从 Kafka一个开源的分布式消息队列读取信息发往下游下游的 Bolt 将收到的句子分割成单独的单词并进行计数。每一个从 Spout 发送出来的 Tuple 会衍生出多个新的 Tuple从 Spout 发送出来的 Tuple 以及后续衍生出来的 Tuple 形成一棵 Tuple 树下图是一棵 Tuple 树示例 上图中所有的 Tuple 都被成功处理了我们才认为 Spout 发出的 Tuple 被完全处理。如果在一个固定的时间内这个时间可以配置默认为 30 秒有至少一个 Tuple 处理失败或超时则认为整棵 Tuple 树处理失败即从 Spout 发出的 Tuple 处理失败。 如何实现不同层次的消息保证机制 Tuple 的完全处理需要 Spout、Bolt 以及 AckerStorm 中用来记录某棵 Tuple 树是否被完全处理的节点协同完成如上图所示。从 Spout 发送 Tuple 到下游并把相应信息通知给 Acker整棵 Tuple 树中某个 Tuple 被成功处理了都会通知 Acker待整棵 Tuple 树都被处理完成之后Acker 将成功处理信息返回给 Spout如果某个 Tuple 处理失败或者超时Acker 将会给 Spout 发送一个处理失败的消息Spout 根据 Acker 的返回信息以及用户对消息保证机制的选择判断是否需要进行消息重传。 Storm 提供的三种不同消息保证机制中。利用 Spout、Bolt 以及 Acker 的组合我们可以实现 At Most Once 以及 At Least Once 语义Storm 在 At Least Once 的基础上进行了一次封装Trident从而实现 Exactly Once 语义。 Storm 的消息保证机制中如果需要实现 At Most Once 语义只需要满足下面任何一条即可 关闭 ACK 机制即 Acker 数目设置为 0Spout 不实现可靠性传输 Spout 发送消息是使用不带 message ID 的 API不实现 fail 函数Bolt 不把处理成功或失败的消息发送给 Acker如果需要实现 At Least Once 语义则需要同时保证如下几条 开启 ACK 机制即 Acker 数目大于 0Spout 实现可靠性传输保证 Spout 发送消息时附带 message 的 ID如果收到 Acker 的处理失败反馈需要进行消息重传即实现 fail 函数Bolt 在处理成功或失败后需要调用相应的方法通知 Acker实现 Exactly Once 语义则需要在 At Least Once 的基础上进行状态的存储用来防止重复发送的数据被重复处理在 Storm 中使用 Trident API 实现。 下图中每种消息保证机制中左边的字母表示上游发送的消息右边的字母表示下游接收到的消息。从图中可以知道At Most Once 中消息可能会丢失上游发送了两个 A下游只收到一个 AAt Least Once 中消息不会丢失可能重复上游只发送了一个 B 下游收到两个 BExactly Once 中消息不丢失、不重复因此需要在 At Least Once 的基础上保存相应的状态表示上游的哪些消息已经成功发送到下游防止同一条消息发送多次给下游的情况。 测试目的 Storm 官方提供 At Most Once、At Least Once 以及 Exactly Once 三种不同层次的消息保证机制我们希望通过相关测试达到如下目的 三种消息保证机制的表现是否与官方的描述相符At Most Once 语义下消息的丢失率和什么有关系、关系如何At Least Once 语义下消息的重复率和什么有关系、关系如何。测试环境 本文的测试环境如下: 每个 workerworker 为一个 物理 JVM 进程用于运行实际的 Storm 作业分配 1 CPU 以及 1.6G 内存。Spout、Bolt、Acker 分别跑在单独的 worker 上。并通过在程序中控制抛出异常以及人工 Kill Spout/Bolt/Acker 的方式来模拟实际情况中的异常情况。 三种消息保证机制的测试均由 Spout 从 Kafka 读取测试数据经由相应 Bolt 进行处理然后发送到 Kafka并将 Kafka 上的数据同步到 MySQL 方便最终结果的统计如下图所示 测试数据为 Kafka 上顺序保存的一系列纯数字数据量分别有十万、五十万、一百万等每个数字在每个测试样例中出现且仅出现一次。 测试场景 对于三种不同的消息保证机制我们分别设置了不同的测试场景来进行充分的测试。其中为了保证 Spout/Bolt/Acker 发生异常的情况下不影响其他节点在下面的测试中所有的节点单独运行在独立的 Worker 上。 At Most Once 从背景中可以得知如果希望实现 At Most Once 语义将 Acker 的数目设置为 0 即可本文的测试过程中通过把设置 Acker 为 0 来进行 At Most Once 的测试。 输入数据 保存在 Kafka 上的一系列纯数字数据量从十万到五百万不等每个测试样例中同一个数字在 Kafka 中出现且仅出现一次。 测试结果 异常次数测试数据总量结果集中不同 Tuple 的总量丢失的 Tuple 数据量Tuple 的丢失百分比Tuple 的重复量050000050000000%001000000100000000%002000000200000000%003000000300000000%0异常次数测试数据总量结果集中不同 Tuple 的总量丢失的 Tuple 数据量Tuple 的丢失百分比Tuple 的重复量1300000027749402250607.50%023000000230708769291323.09%033000000208282391717730.57%0430000001420725157927552.64%0结论 不发生异常的情况下消息能够不丢不重Bolt 发生异常的情况下消息会丢失不会重复其中消息的丢失数目与异常次数正相关。与官方文档描述相符符合预期。 At Least Once 为了实现 At Least Once 语义需要 Spout、Bolt、Acker 进行配合。我们使用 Kafka-Spout 并通过自己管理 offset 的方式来实现可靠的 SpoutBolt 通过继承 BaseBasicBolt自动帮我们建立 Tuple 树以及消息处理之后通知 Acker将 Acker 的数目设置为 1即打开 ACK 机制这样整个 Topology 即可提供 At Least Once 的语义。 测试数据 Kafka 上保存的十万到五十万不等的纯数字其中每个测试样例中每个数字在 Kafka 中出现且仅出现一次。 测试结果 Acker 发生异常的情况 异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数1)出现重复的 Tuple 数数据丢失数量最大积压量0100000100000--02000默认值0200000200000--020000300000300000--020000400000400000--02000异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数1)出现重复的 Tuple 数数据丢失数量最大积压量11000001000002200002000210000010000024001020003100000100000260000200041000001000002800002000Spout 发生异常的情况 异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数1)出现重复的 Tuple 数数据丢失数量0100000100000--00200000200000--00300000300000--00400000400000--0异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数1)出现重复的 Tuple 数数据丢失数量1100000100000220520210000010000024414041000001000002900806100000100000296900316750Bolt 发生异常的情况 调用 emit 函数之前发生异常 | 异常次数 | 结果集中不重复的 Tuple 数 | 数据重复的次数 (1) | 出现重复的 Tuple 数 | 数据丢失量 || — | — | — | — | — | |0 |100000| - |- |0| |0 |200000| - |- |0| |0 |300000| - |- |0| |0 |400000| - |- |0| | 异常次数 | 结果集中不重复的 Tuple 数 | 数据重复的次数 (1) | 出现重复的 Tuple 数 | 数据丢失量 || — | — | — | — | — | |1 |100000| - |- |0| |2 |100000| - |- |0| |4 |100000| - |- |0| |8 |100000| - |- |0| |10|100000| - |- |0| 调用 emit 函数之后发生异常 异常次数结果集中不重复的 Tuple 数数据重复的次数(1)出现重复的 Tuple 数数据丢失数量0100000--00200000--00300000--00400000--0异常次数结果集中不重复的 Tuple 数数据重复的次数(1)出现重复的 Tuple 数数据丢失数量1100000220210000023041000002508100000290101000002110结论 从上面的表格中可以得到消息不会丢失可能发生重复重复的数目与异常的情况相关。 不发生任何异常的情况下消息不会重复不会丢失。Spout 发生异常的情况下消息的重复数目约等于 spout.max.pending(Spout 的配置项每次可以发送的最多消息条数 * NumberOfException异常次数。Acker 发生异常的情况下消息重复的数目等于 spout.max.pending * NumberOfException。Bolt 发生异常的情况 emit 之前发生异常消息不会重复。emit 之后发生异常消息重复的次数等于异常的次数。结论与官方文档所述相符每条消息至少发送一次保证数据不会丢失但可能重复符合预期。 Exactly Once 对于 Exactly Once 的语义利用 Storm 中的 Trident 来实现。 测试数据 Kafka 上保存的一万到一百万不等的数字每个数字在每次测试样例中出现且仅出现一次。 测试结果 Spout 发生异常情况 |异常数 |测试数据量 |结果集中不重复的 Tuple 数 |结果集中所有 Tuple 的总和 | | — | — | — | — | |1 |10000 |10000 |50005000 | |2 |10000 |10000 |50005000 | |3 |10000 |10000 |50005000 | Acker 发生异常的情况 |异常数 |测试数据量 |结果集中不重复的 Tuple 数 |结果集中所有 Tuple 的总和 | | — | — | — | — | |1 |10000 |10000 |50005000 | |2 |10000 |10000 |50005000 | |3 |10000 |10000 |50005000 | Bolt 发生异常的情况 |异常数 |测试数据量 |结果集中不重复的 Tuple 数 |结果集中所有 Tuple 的总和 | | — | — | — | — | |1 |10000 |10000 |50005000 | |2 |10000 |10000 |50005000 | |3 |10000 |10000 |50005000 | 结论 在所有情况下最终结果集中的消息不会丢失不会重复与官方文档中的描述相符符合预期。 总结 对 Storm 提供的三种不同消息保证机制用户可以根据自己的需求选择不同的消息保证机制。 不同消息可靠性保证的使用场景 对于 Storm 提供的三种消息可靠性保证优缺点以及使用场景如下所示 可靠性保证层次优点缺点使用场景At most once处理速度快数据可能丢失都处理速度要求高且对数据丢失容忍度高的场景At least once数据不会丢失数据可能重复不能容忍数据丢失可以容忍数据重复的场景Exactly once数据不会丢失不会重复处理速度慢对数据不丢不重性质要求非常高且处理速度要求没那么高比如支付金额如何实现不同层次的消息可靠性保证 对于 At Least Once 的保证需要做如下几步 需要开启 ACK 机制即 Topology 中的 Acker 数量大于零Spout 是可靠的。即 Spout 发送消息的时候需要附带 msgId并且实现失败消息重传功能fail 函数 可以参考下面的 Spout 代码Bolt 在发送消息时需要调用 emitinputTuple, outputTuple进行建立 anchor 树参考下面建立 anchor 树的代码并且在成功处理之后调用 ack 处理失败时调用 fail 函数通知 Acker。不满足以上三条中任意一条的都只提供 At Most Once 的消息可靠性保证如果希望得到 Exactly Once 的消息可靠性保证可以使用 Trident 进行实现。 不同层次的可靠性保证如何实现 如何实现可靠的 Spout 实现可靠的 Spout 需要在 nextTuple 函数中发送消息时调用带 msgID 的 emit 方法然后实现失败消息的重传fail 函数参考如下示例: /*** 想实现可靠的 Spout需要实现如下两点* 1. 在 nextTuple 函数中调用 emit 函数时需要带一个 msgId用来表示当前的消息如果消息发送失败会用 msgId 作为参数回调 fail 函数* 2. 自己实现 fail 函数进行重发注意在 storm 中没有 msgId 和消息的对应关系需要自己进行维护*/ public void nextTuple() {//设置 msgId 和 Value 一样方便 fail 之后重发collector.emit(new Values(curNum , round ), curNum : round); }Override public void fail(Object msgId) {//消息发送失败时的回调函数 String tmp (String)msgId; //上面我们设置了 msgId 和消息相同这里通过 msgId 解析出具体的消息 String[] args tmp.split(:);//消息进行重发 collector.emit(new Values(args[0], args[1]), msgId); }如何实现可靠的 Bolt Storm 提供两种不同类型的 Bolt分别是 BaseRichBolt 和 BaseBasicBolt都可以实现可靠性消息传递不过 BaseRichBolt 需要自己做很多周边的事情建立 anchor 树以及手动 ACK/FAIL 通知 Acker使用场景更广泛而 BaseBasicBolt 则由 Storm 帮忙实现了很多周边的事情实现起来方便简单但是使用场景单一。如何用这两个 Bolt 实现不可靠的消息传递如下所示 //BaseRichBolt 实现不可靠消息传递 public class SplitSentence extends BaseRichBolt {//不建立 anchor 树的例子OutputCollector _collector;public void prepare(Map conf, TopologyContext context, OutputCollector collector) {_collector collector;}public void execute(Tuple tuple) {String sentence tuple.getString(0);for(String word: sentence.split( )) {_collector.emit(new Values(word)); // 不建立 anchor 树}_collector.ack(tuple); //手动 ack如果不建立 anchor 树是否 ack 是没有区别的这句可以进行注释}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields(word));} }//BaseRichBolt 实现可靠的 Bolt public class SplitSentence extends BaseRichBolt {//建立 anchor 树以及手动 ack 的例子OutputCollector _collector;public void prepare(Map conf, TopologyContext context, OutputCollector collector) {_collector collector;}public void execute(Tuple tuple) {String sentence tuple.getString(0);for(String word: sentence.split( )) {_collector.emit(tuple, new Values(word)); // 建立 anchor 树}_collector.ack(tuple); //手动 ack如果想让 Spout 重发该 Tuple则调用 _collector.fail(tuple);}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields(word));} }下面的示例会可以建立 Multi-anchoring ListTuple anchors new ArrayListTuple(); anchors.add(tuple1); anchors.add(tuple2); _collector.emit(anchors, new Values(1, 2, 3));//BaseBasicBolt 是吸纳可靠的消息传递 public class SplitSentence extends BaseBasicBolt {//自动建立 anchor自动 ackpublic void execute(Tuple tuple, BasicOutputCollector collector) {String sentence tuple.getString(0);for(String word: sentence.split( )) {collector.emit(new Values(word));}}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields(word));} }Trident 在 Trident 中Spout 和 State 分别有三种状态如下图所示 其中表格中的 Yes 表示相应的 Spout 和 State 组合可以实现 Exactly Once 语义No 表示相应的 Spout 和 State 组合不保证 Exactly Once 语义。下面的代码是一个 Trident 示例 OpaqueTridentKafkaSpout spout new OpaqueTridentKafkaSpout(spoutConf); //Opaque Spout//TransactionalTridentKafkaSpout spout new TransactionalTridentKafkaSpout(spoutConf); //Transaction SpoutTridentTopology topology new TridentTopology();String spoutTxid Utils.kafkaSpoutGroupIdBuilder(topologyConfig.kafkaSrcTopic, topologyConfig.topologyName);Stream stream topology.newStream(spoutTxid, spout).name(new stream).parallelismHint(1);// kafka configKafkaProducerConfig kafkaProducerConfig new KafkaProducerConfig(); //KafkaProducerConfig 仅对 kafka 相关配置进行了封装具体可以参考 TridentKafkaStateFactory2(MapString, String config)MapString, String kafkaConfigs kafkaProducerConfig.loadFromConfig(topologyConfig);TridentToKafkaMapper tridentToKafkaMapper new TridentToKafkaMapper(); //TridentToKafkaMapper 继承自 TridentTupleToKafkaMapperString, String实现 getMessageFromTuple 接口该接口中返回 tridentTuple.getString(0);String dstTopic test__topic_for_all;TridentKafkaStateFactory2 stateFactory new TridentKafkaStateFactory2(kafkaConfigs);stateFactory.withTridentTupleToKafkaMapper(tridentToKafkaMapper);stateFactory.withKafkaTopicSelector(new DefaultTopicSelector(dstTopic));stream.each(new Fields(bytes), new AddMarkFunction(), new Fields(word)) //从spout 出来数据是一个 bytes 类型的数据第二个是参数是自己的处理函数第三个参数是处理函数的输出字段.name(write2kafka).partitionPersist(stateFactory //将数据写入到 Kafka 中可以保证写入到 Kafka 的数据是 exactly once 的, new Fields(word), new TridentKafkaUpdater()).parallelismHint(1);
http://www.zqtcl.cn/news/970314/

相关文章:

  • 医疗机械网站怎么做无锡短视频seo
  • 做网站建设哪家公司好如何营销推广
  • 陕西百威建设监理有限司网站做吉祥物设计看什么网站
  • 网络营销站点推广的方法高端网站开发价格
  • 内部优惠券网站怎么做最新国际新闻事件今天
  • 辽宁大学网站怎么做app开发用什么编程语言
  • 3d建模在线制作网站阿里云域名注册官网
  • 创建网站大约多少钱网站建设排序题
  • 大庆做网站找谁机构编制网站建设
  • 网站标题特效网站弹出的对话框怎么做
  • 找深圳网站建设wordpress 页面背景
  • 企业网站怎么维护上海注册建网站
  • 四川省建设工程造价信息网站便宜做网站价格
  • 医院网站优化策划网站开发的项目需求
  • 网站优化公司服务直播软件怎么开发
  • 网站建设 有道翻译织梦修改网站后备份
  • 苏州网联盛网站建设做最好的在线看片网站
  • 一个空间怎么放2个网站陕西城乡住房建设部网站
  • 如何购买虚拟主机做网站企业查名
  • 动易网站默认密码网站怎么做 吸引人
  • 站长工具国产2023二级建造师证书查询官方网站
  • 微信小程序联盟网站北京网站建设华大
  • 人事怎么做招聘网站比对分析crm管理系统 一般包含
  • 林业网站建设有哪些北京微信小程序开发
  • ppt素材网站建设流程图网站开发原型工具
  • 乡镇医院网站建设成都市企业网站建设
  • 网站编辑如何做原创网站中英切换实例
  • 哈尔滨道外区建设局官方网站wordpress简称
  • 教师网站建设企业实践总结华为应用商店下载安装
  • 常见的网站空间服务商资阳建设局网站