当前位置: 首页 > news >正文

株洲网站制作短视频运营方案策划书

株洲网站制作,短视频运营方案策划书,做个网站一般多少钱,厦门小程序开发公司排名这篇文章总结了我在设计Storm Trident拓扑时当前的错误处理方法。 我在这里关注代码设计#xff0c;而不是监督或冗余之类的部署良好实践。 由于Storm的实时流性质#xff0c;当面对大多数错误时#xff0c;我们最终将不得不移至下一个数据。 在这种情况下#xff0c;错误… 这篇文章总结了我在设计Storm Trident拓扑时当前的错误处理方法。 我在这里关注代码设计而不是监督或冗余之类的部署良好实践。 由于Storm的实时流性质当面对大多数错误时我们最终将不得不移至下一个数据。 在这种情况下错误处理归结为或没有报告此错误并在以后或没有重试处理失败的输入数据。 这篇文章的第1部分是关于这方面的。 这意味着在处理元组时通常很难确定它是我们第一次遇到它还是它的内容已经部分地应用于持久性。 因此我们需要使状态更新操作成为幂等这是本文的第二部分。 不要对这篇文章的大小印象深刻Storm实际上为我们完成了大部分工作。 真正需要做的只是了解如何以合理的方式插入东西。 这篇文章基于Storm 0.9Cassandra 2.0.4和Kafka 0.7。 我在github上放置了一个玩具项目 以说明下面讨论的几点。 该项目实际上是根据我在上一篇文章中介绍的“房间存在”示例改编的 。 第1部分处理错误情况 决定何时要求重试 第一个简单的错误处理策略是简单地接受运行时错误导致的计算质量下降。 例如如果拓扑在最近的滑动窗口上计算一些实时趋势估计或者如果我们已经在处理诸如Twitter公开流之类的采样数据则可能是这种情况。 如果我们选择忽略此类错误则实现起来非常简单只需用大量的try / catch包装拓扑逻辑以某种方式报告错误并且不要让任何事情冒充Storm。 但是在大多数情况下我们关心一致性因此必须对尝试重试或不尝试失败的数据做出谨慎的决定。 运行时错误的一个典型示例是入站数据格式问题。 在那种情况下重试当然是没有意义的因为它不会第二次变得更好。 相反我们应该记录故障数据并可能要求某些人进行调查。 这是我的玩具项目中BytesToString Storm函数的一个简单示例 public class BytesToString extends BaseFunction { Overridepublic void execute(TridentTuple tuple, TridentCollector tridentCollector) { try { String asString new String((byte[]) tuple.getValueByField(bytes), UTF-8); tridentCollector.emit(new Values(asString)); } catch (UnsupportedEncodingException e) { logger.err(ERROR: lost data: unable to parse inbound message from Kafka (expecting UTF-8 string), e); } } 另一方面如果错误与某些不可访问的外部数据源有关例如由网络分区引起的错误我们应按下一节所述触发重试。 除上述两种错误外还有许多其他类型的错误但要点仍然是区分可重试错误与不可重试错误并做出相应反应很重要。 最后一点当您决定不报告在IBackingMap的multiget中发生的错误时请格外小心 因为该函数必须返回与输入键列表大小相同的列表。 因此如果出现不可重试的错误我们必须以某种方式返回某些结果。 在大多数情况下如果我们选择不重试这种情况下的错误那是因为某些过去的错误已经在持久性方面破坏了某些内容并且为时已晚。 在下面的示例中由于对从DB读取的某些数据进行的解析失败而发生错误并且代码仅返回null值这等同于考虑到持久性没有任何作用至少没有用处。 另请参阅下面的第3部分以了解针对这种情况的可能解决方案。 Override public ListOpaqueValue multiGet(ListListObject keys) { try { return Utils.opaqueStringToOpaqueValues(opaqueStrings, HourlyTimeline.class); } catch (IOException e) { logger.err(error while trying to deserialize data from json giving up (data is lost!), e); return Utils.listOfNulls(keys.size()); // this assumes previous state does not exist destroys data! } } 好吧来自TimelineBackingMap的这段代码实际上将所有数据替换为null这使情况变得更糟但这是一个玩具项目…… 导致三叉戟元组被重播… 一旦确定触发元组重播是合理的我们只需要询问它Storm就会做其他所有事情只需插入正确的喷嘴请参阅下一节。 从技术上讲这很简单从功能或过滤器之类的Trident原语中触发重试就像抛出FailedException一样简单就像玩具项目中的TimeLineBackingMap中一样其中包括重试和非重试错误的示例请注意代码下面来自TimelineBackingMap的示例假定任何数据库错误都是可重试的这过于简化了 Override public void multiPut(ListListObject keys, ListOpaqueValue timelines) {;ListOpaqueValue jsonOpaqueTimelines; try { jsonOpaqueTimelines Utils.opaqueValuesToOpaqueJson(timelines); } catch (IOException e) { System.err.println(error while trying to serialize data to json giving up (data is lost!)); return; }if (jsonOpaqueTimelines ! null) { try { DB.put(room_timelines, toSingleKeys(keys), jsonOpaqueTimelines); } catch (Exception e) { logger.err(error while storing timelines to cassandra, triggering a retry..., e); throw new FailedException(could not store data into Cassandra, triggering a retry..., e); } } }; 然后Storm会将错误传播回喷嘴以强制重播元组。 如果我们希望在Storm UI中报告错误则可以抛出ReportedFailedException。 我强烈不建议使用的另一种方法是让任何其他类型的RuntimeException冒泡到Storm。 这本质上以更高的性能成本实现了相同的结果它将触发工作节点崩溃并且Nimbus将自动重启并且所有spout将恢复从最新的已知成功索引中读取spout实现如Kafka spout将其最新成功处理的偏移存储在zookeeper中为了这个目的。 这种快速失败策略是Storm设计的一部分请参阅有关工人监督和容错的文档。 从本质上讲这实现了与让spout重播某些元组相同的一致性保证但是对性能的影响当然更大因为我们具有完整的JVM重新启动并重置了所有当前正在运行的拓扑实例。 因此切勿故意这样做。 仍然令人放心的是如果我们的节点崩溃数据不会中断并且流量自然会继续。 Storm决定重播元组的第三种情况是它们是否在配置的超时之前未到达拓扑的末尾。 更确切地说如果未按时收到ACK则该机制实际上是由发出该元组的spout触发的因此如果元组成功处理但由于某些网络分区ACK无法到达该spout则也可以触发这些重播。 用于控制此设置的Storm参数是topology.enable.message.timeouts和topology.message.timeout.secs 根据defaults.yaml的默认值为“ true”和30秒。 这只是为什么拓扑中的幂等性如此重要的又一个原因。 …并实际上重播元组 一旦失败通知到达喷嘴或在超时情况下由通知生成我们需要确保失败的元组将被重播。 除非您自己开发喷嘴否则只能归结为选择正确的喷嘴口味 。 此选择会影响元组的重播或不重播方式因此它必须与适当的策略保持一致以处理拓扑中的已重播的元组这是下一部分的主题。 有3种喷口 非事务性无保证但如果您选择的实现提供“至少一次”保证在某些情况下它们仍然有用 事务性的不建议使用因为它们在某些分区情况下可能会阻止拓扑 opaque不透明就重播而言它们达到元组至少会被播放一次但在重播方面提供了弱保证但在重播的情况下发出的批次可能会不同。 在实践中使用它们时我所建议的所有重要事项是确保拓扑对于这种灵活的重放具有鲁棒性这将在下一部分中进行讨论。 关于元组和批处理重播的最后说明 我在元组级别上进行了讨论因为这使设计决策更简单。 实际上要求Storm重播单个元组将触发同一批中包含的许多其他元组的重播其中一些可能没有错误。 第2部分重播元组的幂等处理 故事的另一面是既然我们知道元组可能会被处理几次请确保拓扑是幂等的即发送相同元组的次数不会使状态不一致。 没有副作用的拓扑部分当然不受元组重播的影响。 关于状态一致性的Storm Trident文档非常清楚因此我在这里仅添加一些内容。 如果我们的状态更新操作已经幂等 如果状态更新操作本质上已经是幂等的那么它已经具有元组重播的弹性并且不需要Storm特殊机制。 如果id值完全基于入站元组内容则任何“按id存储”操作都是这种情况。 例如在我的玩具项目中我存储了占用会话这些会话的主键是从入站事件中找到的相关ID派生的因此在这种情况下写操作已经可以重播了因为任何重播都只会覆盖相同的现有数据信息而不会破坏任何数据假设我们有订购保证在这种情况下是正确的。 public void multiPut(ListListObject keys, ListRoomPresencePeriod newOrUpdatedPeriods) { DB.upsertPeriods(newOrUpdatedPeriods); } 在CassandraDB.java中 try { PreparedStatement statement getSession().prepare(INSERT INTO presence (id, payload) values (?,?)); execute(new BoundStatement(statement).bind(rpp.getId(), periodJson)); } catch (Exception e) { logger.error(error while contacting Cassandra, triggering a retry..., e); new FailedException(error while trying to record room presence in Cassandra , e); } 同时使read-update-write操作成为幂等 我在先前的博客文章中描述了Storm如何使我们能够实现执行以下操作而不需要DB锁并且仍然避免出现竞争情况 从数据库读取以前的状态 根据新的元组数据更新内存中的状态 将新状态保存到数据库 风暴的美丽之处在于为了处理重播的元组而不破坏状态我们只需要调整步骤1和3。这是非常重要的我们现在可以在步骤2中实现所有处理逻辑就像每个元组只被播放一次然后根本不关心重播只要我们是“纯”的请参见下面的评论…。 这就是“风暴只有一次语义”的含义。 而且如果我们在内部实现1和3则使它们重播即可只是将它们与现有的Storm类包装在一起即可。 最健壮的方式是使用Opaque逻辑但代价是每个状态存储两次状态如Trident文档中关于transaction spout的说明 。 更好的是已经有很多不透明的BackingMap实现可用于Storm-contrib中的诸如Cassandra或Mysql的许多后端因此在大多数情况下除了选择正确的之外实际上没有任何其他事情可做。 最重要的一点是要使用处理重播元组的不透明BackingMap必须使用尊重不透明先决条件的喷嘴如本矩阵所述 。 如果由于某种原因需要实现自己的BackingMap我们唯一要做的就是使它存储数据的当前和先前版本以及交易ID。 这是我的玩具项目中的一个简单示例但实际上在编写类似代码之前请考虑一下Storm-contrib public void put(String table, ListString keys, ListOpaqueValue opaqueStrings) {;// this should be optimized with C* batches... for (PairString, OpaqueValue keyValue : Utils.zip(keys, opaqueStrings)) { PreparedStatement statement getSession().prepare(format(INSERT INTO %s (id, txid, prev, curr) values (?, ?, ?, ?), table)); OpaqueValue opaqueVal keyValue.getValue(); execute(new BoundStatement(statement).bind(keyValue.getKey(), opaqueVal.getCurrTxid(), opaqueVal.getCurr(), opaqueVal.getPrev())); } } public ListOpaqueValue get(String table, ListString keys) {;ListOpaqueValue vals new ArrayList(keys.size()); ResultSet rs execute(format(select id, txid, prev, curr from %s where id in ( %s ) , table, toCsv(keys) )); MapString, OpaqueValue data toMapOfOpaque(rs); for (String key: keys){ vals.add(data.get(key)); }return vals; } 然后要真正获得Trident的一次语义唯一要做的就是将其包装在OpaqueMap中如下所示 public static StateFactory FACTORY new StateFactory() { public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { return OpaqueMap.build(new TimelineBackingMap(new CassandraDB(conf))); } } 幕后发生的事情是 OpaqueMap将根据与当前批处理元组关联的事务ID和在持久性中找到的事务ID选择要显示给我们的更新逻辑的先前存储的状态“ curr”或“ prev”。 该事务ID是由喷嘴提供的因此这就是保持喷嘴与状态选择对齐如此重要的原因状态对每个事务ID的含义进行假设。 不要破坏前一个实例 让我们回到上面提到的read-update-write序列的步骤2。 既然我们知道不透明逻辑需要存储任何状态的新版本和旧版本请查看以下Reducer代码并尝试确定其损坏原因 public RoomPresencePeriod reduce(RoomPresencePeriod curr, TridentTuple tuple) { LocationChangedEvent event (LocationChangedEvent) tuple.getValueByField(occupancyEvent);;if (ENTER event.getEventType()) { curr.setStartTime(event.getTime()); // buggy code } else { curr.setEndTme(event.getTime()); // buggy code } return curr; } 函数式编程的专家称其为“不纯”方法因为它会修改其输入参数。 它破坏Storm不透明逻辑的原因是现在“当前”和“先前” java引用实际上都引用内存中的同一实例。 因此当不透明逻辑同时保留某个状态的先前版本和当前版本时实际上它保存的是新版本的两倍因此先前的版本丢失了。 更好的实现可能是这样的 public RoomPresencePeriod reduce(RoomPresencePeriod curr, TridentTuple tuple) { LocationChangedEvent event (LocationChangedEvent) tuple.getValueByField(occupancyEvent);;RoomPresencePeriod updated new RoomPresencePeriod(curr); // copy constructor if (ENTER event.getEventType()) { updated.setStartTime(event.getTime()); } else { updated.setEndTme(event.getTime()); } return updated; }第3部分人为错误全部重播 最后一点我们必须谦虚地意识到无论我们采取了多少上述努力和保障我们仍然会在生产环境中部署错误对此我发誓抱歉。 对于数据处理平台错误可能意味着破坏数据的错误当数据是我们的业务时这是很糟糕的。 在某些情况下我们只会发现事实之后数据已损坏就像上面有关multiget的注释中所述。 内森·马兹Nathan Marz在他的《 大数据》书中 描述了一个简单的基于Lambda架构的“重播所有”想法以解决该想法。 这本书的简短摘要也可以在这里找到 。 参考来自Svend博客的 JCG合作伙伴 Svend Vanderveken 在Storm Trident拓扑中的错误处理 。 翻译自: https://www.javacodegeeks.com/2014/02/error-handling-in-storm-trident-topologies.html
http://www.zqtcl.cn/news/383434/

相关文章:

  • 绥化网站建设兼职互联网大厂设计哪家口碑好
  • 成交型网站建设公司六安亿联网络科技有限公司
  • 优秀行业网站广州网站建设怎么样
  • 南宁建设信息网seo推广公司排名
  • 凯发网站国外网站博客网站也可以做引流
  • 网站设计要学什么vestacp wordpress
  • 模板建站代理3免费做网站
  • 酒店官方网站的功能建设百度网盟推广案例
  • 屯昌网站建设wap网站搭建
  • 毕设做音乐网站重庆正云环境网页制作
  • 免费网站建站w深圳罗湖建网站
  • 创建一个网站一般步骤有哪些互动网站策划
  • 文化传媒 网站设计宿迁网站建设价格
  • 网站开发五人分工是网站推广的案例
  • 海外网站制作seo技术
  • 包头网站建设熊掌号免费行情100个软件
  • 江门网站制作维护电子商务网站运营与管理
  • 动画网页制作网站常用的网络推广方法有
  • 一个设计网站多少钱sku电商是什么意思
  • 做网站优化有前景吗emlog和wordpress
  • 30天网站建设实录 pdf货源网站程序
  • 做企业网站需要多久培训机构 网站建设
  • 商业网站初期建设资金预算哈尔滨视频制作公司
  • 网站建设教程网哪个好wordpress 侧边栏 固定
  • 对网站主要功能界面进行赏析软件开发和app开发的区别
  • 西安市高陵区建设局网站如何重新安装电脑上的wordpress
  • 合肥网站快速优化排名全球人口多少亿
  • 中山网站关键字优化使用动易模版制作网站
  • 深圳营销网站建设报价广西住房建设厅网站
  • 爱站网appwordpress图片500