当前位置: 首页 > news >正文

网站公司seo深鑫辉网站建设

网站公司seo,深鑫辉网站建设,亚马逊网站特点,网站建设宗旨及商业模式使用storm 实时计算在本文中#xff0c;我将说明如何借助Storm框架以可扩展且无锁定的方式在数据库中维护实时事件驱动流程的当前状态。 Storm是基于事件的数据处理引擎。 它的模型依赖于基本原语#xff0c;例如事件转换#xff0c;过滤#xff0c;聚合……#xff0c;我… 使用storm 实时计算 在本文中我将说明如何借助Storm框架以可扩展且无锁定的方式在数据库中维护实时事件驱动流程的当前状态。 Storm是基于事件的数据处理引擎。 它的模型依赖于基本原语例如事件转换过滤聚合……我们将它们组合成拓扑 。 拓扑的执行通常分布在多个节点上并且风暴群集还可以并行执行给定拓扑的多个实例。 因此在设计时必须牢记哪些Storm原语在分区范围内执行即在一个群集节点的级别上执行以及哪些在群集范围内执行又称为重新分区操作 因为它们涉及将事件从中移出的网络流量。分区到分区。 Storm Trident API文档明确提到了哪些功能做什么作用范围如何。 Storm的分区概念与Kafka队列的分区概念保持一致 Kafka队列是入站事件的常见来源。 拓扑通常需要维护一些执行的持续状态。 例如这可以是一些传感器值的滑动窗口平均值从推文中提取的近期情绪在不同位置出现的人数。……由于某些状态更新操作具有分区范围例如partitionAggregate 因此可伸缩性模型在这里尤为重要。其他则具有集群范围例如groupby perstitentAggregate的组合。 这篇文章中说明了这一点。 示例代码在githup上可用 。 它基于Storm 0.8.2Cassandra 1.2.5和JDK 1.7.0。 请注意该示例未包含适当的错误处理喷口或螺栓均不支持重试失败的元组我将在以后的文章中解决。 另外我使用Java序列化将数据存储在元组中因此即使Storm支持多种语言我的示例也是特定于Java的。 实际示例出席事件 我的示例是模拟一个跟踪人们在建筑物内位置的系统。 每当用户进入或离开房间时每个房间入口处的传感器都会发出如下事件 {eventType: ENTER, userId: John_5, time: 1374922058918, roomId: Cafetaria, id: bf499c0bd09856e7e0f68271336103e0A, corrId: bf499c0bd09856e7e0f68271336103e0} {eventType: ENTER, userId: Zoe_15, time: 1374915978294, roomId: Conf1, id: 3051649a933a5ca5aeff0d951aa44994A, corrId: 3051649a933a5ca5aeff0d951aa44994} {eventType: LEAVE, userId: Jenny_6, time: 1374934783522, roomId: Conf1, id: 6abb451d45061968d9ca01b984445ee8B, corrId: 6abb451d45061968d9ca01b984445ee8} {eventType: ENTER, userId: Zoe_12, time: 1374921990623, roomId: Hall, id: 86a691490fff3fd4d805dce39f832b31A, corrId: 86a691490fff3fd4d805dce39f832b31} {eventType: LEAVE, userId: Marie_11, time: 1374927215277, roomId: Conf1, id: 837e05916349b42bc4c5f65c0b2bca9dB, corrId: 837e05916349b42bc4c5f65c0b2bca9d} {eventType: ENTER, userId: Robert_8, time: 1374911746598, roomId: Annex1, id: c461a50e236cb5b4d6b2f45d1de5cbb5A, corrId: c461a50e236cb5b4d6b2f45d1de5cbb5} 对“ ENTER”和“ LEAVE”对中的每个事件与一个房间内一个用户的一个占用时间段相对应。 这可能对传感器提出了很多要求但是出于本示例的目的这使我的生活更加轻松 。 为了使事情变得有趣让我们想象一下不能保证到达我们服务器的事件遵循时间顺序请参见生成事件的python脚本中的shuffle调用。 我们将构建一个Storm拓扑该拓扑将构建每个房间的每分钟每分钟的占用时间线如本文结尾处的时间图所示。 在数据库中房间时间线被切成一个小时的时间段这些时间段被独立存储和更新。 这是Cafetaria占用1小时的示例 {roomId:Cafetaria,sliceStartMillis:1374926400000,occupancies:[11,12,12,12,13,15,15,14,17,18,18,19,20,22,22,22,21,22,23,25,25,25,28,28,33,32,31,31,29,28,27,27,25, 22,22,21,20,19,19,19,17,17,16,16,15,15,16,15,14,13,13,12,11,10,9,11,10,9,11,10]} 为了实现这一点我们的拓扑需要 根据correlationID重新组合“ ENTER”和“ LEAVE”事件并为此用户在此房间中产生相应的存在时间 将每个在场期间的影响应用于房间入住时间表 顺便说一句Cassandra提供了Counter列 尽管我可以很好地替代它们但我在这里不使用它们。 但是我的目的是说明Storm功能即使它会使方法有些虚构。 分组依据/ persistentAggregate / iBackingMap说明 在查看示例代码之前让我们澄清一下这些“三叉戟风暴”原语如何协同工作。 想象一下我们从上午9:47到上午10:34收到了两个描述用户在roomA中存在​​的事件。 更新会议室的时间表需要 从数据库加载两个受影响的时间轴切片[9.00am10:00 am]和[10.00am11:00 am] 在这两个时间轴切片中添加此用户的状态 将它们保存到数据库 但是像这样天真地实现此目标并不是最佳选择首先是因为它每个事件使用两个DB请求其次是因为这种“读取-更新-写入”序列通常需要一种锁定机制这种锁定机制通常无法很好地扩展。 为了解决第一点我们想为几个事件重新组合数据库操作。 在Storm中事件或元组 被成批处理。 IBackingMap是一个我们可以实现的原语它使我们可以立即查看整批元组。 我们将使用它在批处理的开始multiget和结束时的所有DB-write操作multiput重新分组。 但是multiget不允许我们查看元组本身而只能查看“查询键”这是根据元组内容计算出来的如下所述。 原因在于上面提到的关于天真的实现的第二点我们想并行执行几个[multiget 更新逻辑 multiput]流而不依赖锁。 这是通过确保那些并行子流程更新不相交的数据集来实现的。 这就要求定义拆分成并行流的拓扑元素还控制每个流内DB中要加载和更新的数据。 该元素是Storm groupBy原语它通过按字段值对元组进行分组来定义拆分并且它通过将“ groupedBy”值作为对multiget的查询关键字来控制每个并行流更新的数据。 下图在房间占用示例中对此进行了说明简化为每个房间仅存储一个时间线而不是每个一小时的时间片一个时间线 但是并行性并没有完全发生例如当前的Storm实现在分组流中依次调用每个reducer / combiner但这是设计拓扑时要牢记的一个好模型。 有趣的是在groupBy和multiget之间发生了一些Storm魔术。 回想一下Storm旨在进行大规模分布这意味着每个流在多个节点上并行执行从诸如Hadoop HDFS或分布式Kafka队列之类的分布式数据源获取输入数据。 这意味着groupBy同时在多个节点上执行所有可能处理的事件都需要组合在一起。 groupBy是一个重新分区操作 可确保将所有需要分组的事件发送到同一节点并由IBackingMap 组合器或约简器的同一实例处理因此不会发生争用情况。 同样Storm要求我们将IBackingMap包装到可用的Storm MapState原语或我们自己的原语之一中通常用于处理失败/重播的元组。 如上所述我不在本文中讨论这一方面。 使用这种方法我们必须实现IBackingMap以便它尊重以下属性 对于不同的键值由multiget读取和由IBackingMap的multiput操作写入的数据库行必须是不同的。 我想这就是他们将这些值称为“关键”的原因 尽管任何尊重此属性的方法都可以。 回到例子 让我们看看这在实践中是如何工作的。 该示例的主要拓扑在此处可用 // reading events .newStream(occupancy, new SimpleFileStringSpout(data/events.json, rawOccupancyEvent)) .each(new Fields(rawOccupancyEvent), new EventBuilder(), new Fields(occupancyEvent)) 第一部分只是读取JSON格式的输入事件我正在使用简单的文件输出对它们进行反序列化然后使用Java序列化将它们放入称为“ occupancyEvent”的元组字段中。 这些元组中的每一个都描述了用户在房间内或房间外的“ ENTER”或“ LEAVE”事件。 // gathering enter and leave events into presence periods .each(new Fields(occupancyEvent), new ExtractCorrelationId(), new Fields(correlationId)) .groupBy(new Fields(correlationId)) .persistentAggregate( PeriodBackingMap.FACTORY, new Fields(occupancyEvent), new PeriodBuilder(), new Fields(presencePeriod)) .newValuesStream() 当我们遇到correlationId的不同值时groupBy原语会创建尽可能多的元组组这可能意味着很多因为通常最多两个事件具有相同的correlationId。 当前批处理中具有相同相关ID的所有元组将重新组合在一起并且一组或几组元组将一起呈现给persistentAggregate中定义的元素。 PeriodBackingMap是IBackingMap的实现其中实现了multiget方法该方法将接收下一步将要处理的元组组的所有相关ID例如{“ roomA”“ roomB”“ Hall ”}如上图所示。 public ListRoomPresencePeriod multiGet(ListListObject keys) {return CassandraDB.DB.getPresencePeriods(toCorrelationIdList(keys)); } 该代码只需要从数据库中检索每个相关ID的潜在存在期间即可。 因为我们对一个元组字段进行了groupBy所以每个List在这里都包含一个单个StringcorrelationId。 请注意我们返回的列表必须与键列表的大小完全相同以便Storm知道哪个周期对应于哪个键。 因此对于数据库中不存在的任何键我们只需在结果列表中放置一个空值即可。 一旦加载Storm就会将一个具有相同相关性ID的元组一个一个地呈现给我们的化简器PeriodBuilder 。 在我们的例子中我们知道在此批次中每个唯一的relativeId最多被调用两次但是一般来说可能更多或者如果当前批次中不存在其他ENTER / LEAVE事件则仅被调用一次。 在对muliget/ multiput的调用与我们的reducer之间借助我们选择的MapState实现Storm让我们可以插入适当的逻辑来重放先前失败的元组。 在以后的文章中有更多的信息…… 一旦我们减少了每个元组序列Storm就会将结果传递给IBackingMap的mulitput在这里我们只是将所有内容“追加”到数据库 public void multiPut(ListListObject keys, ListRoomPresencePeriod newOrUpdatedPeriods) {CassandraDB.DB.upsertPeriods(newOrUpdatedPeriods); } Storm persistenceAggregate使用我们的化简提供给multitput的值自动将其发送到拓扑元组的后续部分。 这意味着我们刚刚建立的在线状态很容易作为元组字段使用我们可以使用它们直接更新会议室时间线 // building room timeline .each(new Fields(presencePeriod), new IsPeriodComplete()) .each(new Fields(presencePeriod), new BuildHourlyUpdateInfo(), new Fields(roomId, roundStartTime)) .groupBy(new Fields(roomId, roundStartTime)) .persistentAggregate( TimelineBackingMap.FACTORY, new Fields(presencePeriod,roomId, roundStartTime), new TimelineUpdater(), new Fields(hourlyTimeline)) 第一行只是过滤掉尚未包含“ ENTER”和“ LEAVE”事件的任何期间。 然后 BuildHourlyUpdateInfo实现一对多的元组发射逻辑对于每个占用期它仅在“开始时间”内发射一个元组。 例如从9:47 am到10:34 am在roomA中的占用将在此处触发针对RoomA的9.00am时间轴切片的元组的发射以及另一个针对10.00am的元组的发射。 下一部分实现了与以前相同的groupBy / IBackingMap方法只是这次使用了两个分组键而不是一个因此mulitget中的List Object将包含两个值一个String和一个Long。 由于我们存储一个小时的时间轴块因此上述IBackingMap的必要属性得到了尊重。 多重获取为每个“ roomId”“开始时间”对检索时间线块然后TimelineUpdater 再次使用reducer用与当前批次中找到的该时间线片相对应的每个存在时间更新时间线片这就是BuildHourlyUpdateInfo的一对多元组发射逻辑和multiput仅保存结果。 导致咖啡厅占用 当我们看着它时一切总是更加美丽所以让我们来绘制房间的占用情况 。 稍加一些R代码 我们就可以一分钟一分钟地看到房间的占用情况这并不意味着什么因为所有数据都是随机的但是…… 结论 希望本文能为维护Storm拓扑中的状态提供一种有用的方法。 我还尝试说明了将处理逻辑实现为小型拓扑元素的实现将其彼此插入而不是将一些“冗长的螺栓”捆绑在冗长而复杂的逻辑部分上。 Storm的一个重要方面是它的可扩展性很可能去插入它的子类或在任何地方插入它的子类来调整其行为。 春天有十年前的那种聪明而有趣的感觉哦该死我现在有点老了……^ __ ^ 参考来自Svend博客的 JCG合作伙伴 Svend Vanderveken 使用Storm进行的可伸缩实时状态更新 。 翻译自: https://www.javacodegeeks.com/2013/08/scalable-real-time-state-update-with-storm.html使用storm 实时计算
http://www.zqtcl.cn/news/222651/

相关文章:

  • 遵义建设厅官方网站 元丰兰州网站设计有限公司
  • 芜湖做网站的公司排名贵阳好的网站建设公司
  • 网站建设 骏域网站建设专家最有效的15个营销方法
  • 大连品牌官网建站为什么有些网站更新的信息看不到
  • 富阳市网站域名申请好了怎么做网站
  • 做药物分析必须知道的网站网站攻击一般有那些
  • 一般网站做哪些端口映射那个网站做境外自由行便宜
  • 网站的建站过程公司seo是什么意思
  • 胜利油田局域网主页入口seo自学网官网
  • 阜阳网站是网站开发与设计专业
  • 网站建设哪个品牌好网站新备案不能访问
  • 网站备案号申请流程华为企业文化
  • 服装网站目标互联网舆情报告
  • 1.网站开发的详细流程电商网站开发文档
  • 域名估价网站制作网站需要注意什么
  • 新浪云虚拟主机做电影网站用什么l软件做网站了
  • 方城网站建设猴痘的治疗方法
  • 做响应式网站有什么插件哔哩哔哩免费安装
  • 织梦网站默认密码wordpress菜单页和文章页路径不同
  • 那些网站可以做兼职网站建设与维护 东博
  • 快速建站的模板建设银行嘉兴分行官方网站
  • 江西智能网站建设wordpress三栏博客主题
  • 怎么做网站账号注册机sem竞价
  • 吕梁建设机械网站怎么让网站排名上去
  • 网站建设的需要分析龙岗招聘网
  • 如何制作企业的网站网站开发答辩ppt
  • 大连中山网站建设网站在线qq代码
  • 南昌seo网站微商城网站建设如何
  • anker 网站建设手机可以做网站的服务器吗
  • 门户网站建设 报价没有网页快照对网站有什么影响