当前位置: 首页 > news >正文

做废旧哪个网站好女人做绿叶网站相亲拉人

做废旧哪个网站好,女人做绿叶网站相亲拉人,培训前端开发,珠海网站策划公司简介#xff1a; 网易游戏流式 ETL 建设实践及调优经验分享#xff5e; 网易游戏资深开发工程师林小铂为大家带来网易游戏基于 Flink 的流式 ETL 建设的介绍。内容包括#xff1a; 专用 ETLEntryX 通用 ETL调优实践未来规划 一. 业务背景 网易游戏 ETL 服务概况 网易游戏的…简介 网易游戏流式 ETL 建设实践及调优经验分享 网易游戏资深开发工程师林小铂为大家带来网易游戏基于 Flink 的流式 ETL 建设的介绍。内容包括   专用 ETLEntryX 通用 ETL调优实践未来规划 一. 业务背景 网易游戏 ETL 服务概况 网易游戏的基础数据主要日志方式采集这些日志通常是非结构化或半结构化数据需要经过数据集成 ETL 才可以入库至实时或离线的数据仓库。此后业务用户才可以方便地用 SQL 完成大部分数据计算包括实时的 Flink SQL 和离线的 Hive 或 Spark。 网易游戏数据集成的数据流与大多数公司大同小异主要有游戏客户端日志、游戏服务端日志和其他周边基础的日志比如 Nginx access log、数据库日志等等。这些日志会被采集到统一的 Kafka 数据管道然后经由 ETL 入库服务写入到 Hive 离线数据仓库或者 Kafka 实时数据仓库。 这是很常见的架构但在我们在需求方面是有一些比较特殊的情况。 网易游戏流式 ETL 需求特点 首先不同于互联网、金融等行业基本常用 MySQL、Postgres 等的关系型数据库游戏行业常常使用 MongoDB 这类 schema-free 的文档型数据库。这给我们 ETL 服务带来的问题是并没有一个线上业务的准确的 schema 可以依赖在实际数据处理中多字段或少字段甚至一个字段因为玩法迭代变更为完全不同的格式这样的情况都是可能发生的。这样的数据异构问题给我们 ETL 的数据清洗带来了比较高的成本。 其次也是由于数据库选型的原因大部分业务的数据库模式都遵循了反范式设计会刻意以复杂内嵌的字段来避免表间的 join。这种情况给我们带来的一个好处是在数据集成阶段我们不需要去实时地去 join 多个数据流坏处则是数据结构可能会非常复杂多层嵌套十分常见。 然后由于近年来实时数仓的流行我们也同样在逐步建设实时数据仓库所以复用现有的 ETL 管道提取转换一次加载到实时离线两个数据仓库成为一个很自然的发展方向。 最后我们的日志类型多且变更频繁比如一个玩法复杂的游戏可能有 1,000 个以上的日志类型每两周可能就会有一次发版。在这样的背景下 ETL 出现异常数据是不可避免的。因此我们需要提供完善的异常处理让业务可以及时得知数据异常和通过流程修复数据。 日志分类及特点 为了更好地针对不同业务使用模式优化我们对不同日志类型的业务提供了不同的服务。我们的日志通常分为三个类型运营日志、业务日志和程序日志。 运营日志记录的是玩家行为事件比如登录帐号、领取礼包等。这类日志是最为重要日志有固定的格式也就是特定 header json 的文本格式。数据的主要用途是做数据报表、数据分析还有游戏内的推荐比如玩家的组队匹配推荐。 业务日志记录的是玩家行为以外的业务事件这个就比较广泛比如 Nginx access log、CDN 下载日志等等这些完全没有固定格式可能是二进制也可能是文本。主要用途类似于运营日志但更加丰富和定制化。 程序日志记录是程序的运行情况也就是平时我们通过日志框架打的 INFO、ERROR 这类日志。程序日志主要用途是检索定位运行问题通常是写入 ES但有时数量过大或者需要提取指标分析时也会写入数据仓库。 网易游戏 ETL 服务剖析 针对这些日志分类我们具体提供了三类 ETL 入库的服务。首先是运营日志专用的 ETL这会根据运营日志的模式进行定制化。然后是通用的面向文本日志的 EntryX ETL 服务它会服务于运营日志以外的所有日志。最后是 EntryX 无法支持的特殊 ETL 需求比如有加密或者需要进行特殊转换的数据这种情况下我们就会针对性地开发 ad-hoc 作业来处理。 二. 运营日志专用 ETL 运营日志 ETL 发展历程 运营日志 ETL 服务有着一个比较久的历史。大概在 2013 年网易游戏就建立了基于 Hadoop Streaming Python 预处理/后处理的第一版离线 ETL 框架。这套框架是平稳运行了多年。 在 2017 年的时候随着 Spark Streaming 的崭露头角我们开发了基于 Spark Streaming 的第二个版本相当于一个 POC但因为微批调优困难且小文件多等问题没有上线应用。 时间来到 2018 年当时 Flink 已经比较成熟我们也决定将业务迁移到 Flink 上所以我们很自然地开发了基于 Flink DataStream 的第三版运营日志 ETL 服务。这里面比较特殊的一点就是因为长久以来我们业务方积累了很多 Python 的 ETL 脚本然后新版最重要的一点就是要支持这些 Python UDF 的无缝迁移。 运营日志 ETL 架构 接下来看下两个版本的架构对比。 在早期 Hadoop Streaming 的版本里面数据首先会被 dump 到 HDFS 上然后 Hadoop Streaming 启动 Mapper 来读取数据并通过标准输入的方式传递给 Python 脚本。Python 脚本里面会分为三个模块首先预处理 UDF这里通常会进行基于字符串的替换一般用作规范化数据比如有些海外合作厂商的时间格式可能跟我们不同那么就可以在这里进行统一。预处理完的数据会进入通用的解析/转换模块这里我们会根据运营日志的格式来解析数据并进行通用转换比如滤掉测试服数据。通用模块之后最后还有一个后处理模块进行针对字段的转换比如常见的汇率转换。之后数据会通过标准输出返回给 Mapper然后 Mapper 再将数据批量写到 Hive 目录中。 我们用 Flink 重构后数据源就由 HDFS 改为直接对接 Kafka而 IO 模块则用 Flink 的 Source/Sink Operator 来代替原本的 Mapper然后中间通用模块可以直接重写为 Java剩余的预处理和后处理则是我们需要支持 Python UDF 的地方。 Python UDF 实现 在具体实现上我们在 Flink ProcessFunction 之上加入了 Runner 层Runner 层负责跨语言的执行。技术选型上是选了 Jython而没有选择 Py4j主要因为 Jython 可以直接在 JVM 里面去完成计算不需要额外启动 Python 进程这样开发和运维管理成本都比较低。而 Jython 带来的限制比如不支持 pandas 等基于 c 的库这些对于我们的 Python UDF 来说都是可接受的。 整个调用链是ProcessFunction 在 TaskManager 被调用时会在 open 函数延迟初始化 Runner这是因为 Jython 是不可序列化的。Runner 初始化时会负责资源准备包括将依赖的模块加入 PYTHONPATH然后根据配置反射调用 UDF 函数。 调用时对于预处理 UDF Runner 会把字符串转化为 Jython 的 PyUnicode 类型而对于后处理 UDF 则会把解析后的 Map 对象转为 Jython 的 PyDcitionary分别作为两者的输入。UDF 可以调用其他模块进行计算最终返回 PyObject然后 Runner 再将其转换成 Java String 或者 Map返回给 ProcessFunction 输出。 运营日志 ETL 运行时 刚刚是 UDF 模块的局部视图我们再来看下整体的 ETL 作业视图。首先在我们提供了通用的 Flink jar当我们生成并提交 ETL 作业到作业平台时调度器会执行通用的 main 函数构建 Flink JobGraph。这时会从我们的配置中心也就是 ConfigServer拉取 ETL 配置。ETL 配置中包含使用到的 Python 模块后端服务会扫描其中引用到的其他模块把它们统一作为资源文件通过 YARN 分发功能上传到 HDFS 上。在 Flink JobManager 和 TaskManager 启动时这些 Python 资源会被 YARN 自动同步到工作目录上备用。这就是整个作业初始化的过程。 然后因为 ETL 规则的小变更是很频繁的比如新增一个字段或者变更一下过滤条件如果我们每次变更都需要重启作业那么作业重启带来的不可用时间会对我们的下游用户造成比较糟糕的体验。因此我们对变更进行了分类对于一些不影响 Flink JobGraph 的轻量级变更支持热更新。实现的方式是每个 TaskManager 启动一个热更新线程定时轮询配置中心同步配置。 三. EntryX 通用 ETL 接下来介绍我们的通用 ETL 服务 EntryX。这里的通用可以分为两层意义首先是数据格式上的通用支持非结构化到结构化的各种文本数据其次是用户群体的通用目标用户覆盖数据分析、数据开发等传统用户和业务程序、策划这些数据背景较弱的用户。 EntryX 基本概念 先介绍 EntryX 的三个基本概念Source、StreamingTable 和 Sink。用户需要分别配置这个三个模块系统会根据这些自动生成 ETL 作业。 Source 是 ETL 作业的输入源通常是从业务端采集而来的原始日志 topic或者是经过分发过滤后的 topic。这些 topic 可能只包含一种日志但更多情况下会包含多种异构日志。 接下来 StreamingTable一个比较通俗的名称就是流表。流表定义了 ETL 管道的主要元数据包括如何转换数据还有根据转换好的数据定义的流表 schema将数据 schema 化。流表 schema 是最为关键的概念它相当于 Table DDL主要包括字段名、字段数据类型、字段约束和表属性等。为了更方便对接上下游流表 schema 使用的是自研的 SQL-Like 的类型系统里面会支持我们一些拓展的数据类型比如 JSON 类型。 最后 Sink 负责流表到目标存储的物理表的映射比如映射到目标 Hive 表。这里主要需要 schema 的映射关系比如流表哪个字段映射到目标表哪个字段流表哪个字段用作目标 Hive 表分区字段。在底层系统会自动根据 schema 映射关系来提取字段并将数据转换为目标表的存储格式加载到目标表。 EntryX ETL 管道 再来看下 EntryX ETL 管道的具体实现。蓝色部分是外部存储系统而绿色部分则是 EnrtyX 的内部模块。 数据首先从对接采集的原始数据 Topic 流入经过 Source 摄入到 Filter。Filter 负责根据关键词过滤数据通常来说我们要求过滤完的数据是有相同 schema 的。经过这两步数据完成 Extract来到 Transform 阶段。 Transform 第一步是解析数据也就是这里的 Parser。Parser 支持 JSON/Regex/Csv 三种解析基本可以覆盖所有案例。第二步是对数据进行转换这是由 Extender 负责的。Extender 通过内置函数或 UDF 计算衍生字段最常见的是将 JSON 对象拉平展开提取出内嵌字段。最后是 FormatterFormatter 会根据之前用户定义的字段逻辑类型将字段的值转为对应的物理类型。比如一个逻辑类型为 BIGINT 的字段我们在这里会统一转为 Java long 的物理类型。 数据完成 Transform 之后来到最后的 Load 阶段。Load 第一步是决定数据应该加载到哪个表。Splitter 模块会根据每个表的入库条件也就是一个表达式来分流数据然后再到第二步的 Loader 来负责将数据写到具体的外部存储系统。目前我们支持 Hive/Kafka 两种存储Hive 支持 Text/Parquet/JSON 三种格式而 Kafka 支持 JSON 和 Avro 两种格式。 实时离线统一 Schema 在 Entryx 的设计里数据可以被写入实时和离线两个数据仓库也就是说同一份数据但在不同的存储系统中以不同格式表示。从 Flink SQL 的角度来说是 schema 部分相同但 connector 和 format 不同的两个表。而 schema 部分经常会随业务变更而 connector 和 format也就是存储系统和存储格式是相对稳定的。那么一个很自然的想法就是能不能将 schema 部分提取出来独立维护实际上这个抽象的 schema 已经存在了就是我们在 ETL 提取的流表 schema。 在 EntryX 里面流表 schema 是与序列化器、存储系统无关的 schema作为 Single Source of Truth。基于流表 schema加上存储系统信息和存储格式信息我们就可以衍生出具体的物理表的 DDL。目前我们主要是支持 Hive/Kafka如果之后要拓展至支持 ES/HBase 表也是非常方便。 实时数据仓库集成 EntryX 一个重要的定位是作为实时仓库的统一入口。刚刚其实已经多次提到 Kafka 表但还没有说实时数仓是怎么做的。实时数仓的常见问题是 Kafka 并没有原生支持 schema 元数据的持久化。目前社区的主流解决方案是基于 Hive MetaStore 来保存 Kafka 表的元数据并复用 HiveCatalog 来直接对接到 Flink SQL。 但这对于我们来说使用 Hive MetaStore 主要有几个问题一是在实时作业里引入 Hive 依赖并与 Hive 耦合这是很重的依赖导致定义的表很难被其他组件复用包括 Flink DataStream 用户二是我们已经有 Kafka SaaS 平台 Avatar 来管理物理 schema比如 Avro schema如果再引入 Hive MetaStore 会导致元数据的割裂。因此我们是拓展了 Avatar 平台的 schema 注册中心同时支持逻辑 schema 和物理 schema。 那么实时数仓和 EntryX 的集成关系是首先我们有 EntryX 的流表 schema在新建 Sink 的时候调用 Avatar 的 schema 接口根据映射关系生成逻辑 schema而 Avatar 再根据 Flink SQL 类型与物理类型的映射关系生成 topic 的物理 schema。 与 Avatar schema 注册中心配套的还有我们自研的 KafkaCatalog它负责读取 topic 的逻辑和物理 schema 来生成 Flink SQL 的 TableSource 或 TableSink。而对于一些 Flink SQL 以外的用户比如 Flink DataStream API 的用户他们也可以直接读取物理 schema 来享受到数据仓库的便利。 EntryX 运行时 和运营日志 ETL 类似在 EntryX 运行时系统会基于通用的 jar 和配置生成 Flink 作业但这里有两种情况需要特别处理。 首先是一个 Kafka topic 往往有几十甚至上千种日志那么对应其实有也几十甚至上千的流表如果每个流表都单独运行在一个作业里那么一个 topic 会可能会被读上千遍这是非常大的浪费。因此在作业运行时提供一个优化策略可以将同个 source 的不同流表合并到一个作业里跑。比如图中某个手游上传了 3 种日志到 Kafka用户分别配置了玩家注册、玩家登录、领取礼包三个流表那么我们可以这三个流表合并起来到一个作业共享同一个 Kafka Source。 另外的一个优化是一般情况下我们可以按照之前“提取转换一次加载一次”的思路来将数据同时写到 Hive 和 Kafka但是由于 Hive 或者说 HDFS 毕竟是离线系统实时性比较差写入在一些负载比较高的 HDFS 老集群经常会出现反压同时阻塞上游导致 Kafka 的写入也受到影响。在这种情况下我们通常要分离加载到实时和离线的 ETL 管道具体会取决于业务的 SLA 还有 HDFS 的性能。 四.调优实践 接下来给大家分享下我们在 ETL 建设中的调优实践经验。 HDFS 写入调优 首先是 HDFS 写入的调优。流式写入 HDFS 场景中老生常谈的一个问题便是小文件过多。通常来说小文件和实时性是鱼与熊掌不可兼得。如果要延迟低那么我们需要频繁地滚动文件来提交数据必然导致小文件过多。 小文件过多主要造成两个问题一从 HDFS 集群管理角度看小文件会占用大量的文件数和 block 数浪费 NameNode 内存二是从用户角度看读写效率都会降低因为写的时候要更频繁地调用 RPC 和 flush 数据造成更多的阻塞有时甚至造成 checkpoint 超时而读时则需要打开更多的文件才能读完数据。 HDFS 写入调优 - 数据流预分区 我们在优化小文件问题时做的一点调优是对数据流先做一遍预分区具体来说便是在 Flink 作业内部先基于目标 Hive 表进行一次 keyby 分区让同一个表的数据尽量集中在少数的几个 subtask 上。 举个例子假设 Flink 作业并行度为 n而目标 Hive 分区数为 m 个。因为每个 subtask 都有可能读到任意分区的数据在默认的各 subtask 完全并行的情况下每个 subtask 都会写所有分区造成总体的写入文件数是 n * m。假设 n 是 100m 是 1000按 10 分钟滚一次文件算每天会造成 14,400,000 个文件这对于很多老集群来说是非常大的压力。 如果经过数据流分区的优化之后我们就可以限制住 Flink 并行度带来的增长。比如我们 keyby hive 表字段并加入范围为 0-s 整数的盐来避免数据倾斜那么分区最多会被 s 个 subtask 读写。假设 s 是 5比起原先 n 是 100那么我们就将原本的文件数降低为原来 20 分之一。 基于 OperatorState 的 SLA 统计 第二个我想分享的是我们的 SLA 统计工具。背景是我们的用户经常会通过 Web UI 来进行调试和问题的排查比如不同 subtask 的输入输出数目但这些 metric 会因为作业重启或者 failover 而重置因此我们开发了基于 OperatorState 的 SLA-Utils 工具来统计数据的输入和分类输出。这个工具设计得非常轻量级可以很容易集成到我们自己的服务或者用户的作业里面。 在 SLA-Utils 里面我们支持了三种 metric。首先是标准的 metric有 recordsIn/recordsOut/recordsDropped/recordsErrored分别对应输入记录数/正常输出记录数/被过滤掉的记录数/处理异常的记录数。通常来说 recordsIn 就等于后面三者的总和。第二种用户可以自定义的 metric通常可以用于记录更详细的原因比如是 recordsEventTimeDropped 代表数据是因为 event time 被过滤的。 那么上述两种 metric 静态的也就是说 metric key 在作业运行前就要确定此外 SLA-Utils 还支持在运行时动态注册的 TTL metric。这种 metric 通常有动态生成的日期作为前缀在经过 TTL 的时间之后被自动清理。TTL metric 主要可以用于做天级别时间窗口的统计。这里比较特别的一点是因为 OperatorState 是不支持 TTL 的SLA-Utils 是在每次进行 checkpoint 快照的时候进行一次过滤剔除掉过期的 metric以实现 TTL 的效果。 那么在 State 保存了 SLA 指标之后要做的就是暴露给用户。我们目前的做法是通过 Accumulater 的方式来暴露优点是 Web UI 有支持开箱即用同时 Flink 可以自动合并不同的 subtask 的 metric。缺点在于没有办法利用 metric reporter 来 push 到监控系统同时因为 Acuumulater 是不能在运行时动态注销的所以使用 TTL metric 会有内存泄漏的风险。因此在未来我们也考虑支持 metric group 来避免这些问题。 数据容错及恢复 最后再分享下我们在数据容错和恢复上的实践。 以很多最佳实践相似我们用 SideOutput 来收集 ETL 各环节中出错的数据汇总到一个统一的错误流。错误记录中包含我们预设的错误码、原始输入数据以及错误类和错误信息。一般情况下错误数据会被分类写入 HDFS用户通过监控 HDFS 目录可以得知数据是否正常。 那么存储好异常数据后下一步就是要恢复数据。这通常有两种情况。 一是数据格式异常比如日志被截断导致不完整或者时间戳不符合约定格式这种情况下我们一般通过离线批作业来修复数据重新回填到原有的数据管道。 二是 ETL 管道异常比如数据实际的 schema 有变更但流表配置没有更新可能会导致某个字段都是空值这时我们的处理办法是首先更新线上的流表配置为最新保证不再产生更多异常数据这时 Hive 里面仍有部分分区是异常的。然后我们发布一个独立的补数作业来专门修复异常的数据输出的数据会写到一个临时的目录并在 hive metastore 上切换 partition 分区的 location 来替换掉原来的异常目录。因此这样的一个补数流程对离线查询的用户来说是透明的。最后我们再在合适的时间替换掉异常分区的数据并恢复 location。 五.未来规划 最后介绍下我们的未来规划。 第一个是数据湖的支持。目前我们的日志绝大多数都是 append 类型不过随着 CDC 和 Flink SQL 业务的完善我们可能会有更多的 update、delete 的需求因此数据湖是一个很好的选择。第二个会提供更加丰富的附加功能比如实时的数据去重和小文件的自动合并。这两个都是对业务方非常实用的功能。最后是一个支持 PyFlink。目前我们的 Python 支持只覆盖到数据集成阶段后续数据仓库的 Python 支持我们是希望通过 PyFlink 来实现。 作者阿里云实时计算Flink 原文链接 本文为阿里云原创内容未经允许不得转载
http://www.zqtcl.cn/news/699808/

相关文章:

  • 青岛快速建站模板制作公司网页什么价位
  • 网站建设公司的经营范围wordpress设置文本编辑器
  • 做网站用微软雅黑侵权吗wordpress 同类文章
  • 免费下载建设银行官方网站自己做网站犯法吗
  • 手机网站html代码附近做广告牌的店
  • 建设和优化网站的步骤wordpress 模板 含数据库
  • 太原制作网站的工作室wordpress弹幕播放器
  • 英语网站开发菏泽做网站优化的
  • 宜昌建设网站公司做网站语言服务器 空间
  • 湖南做网站价格广州网站建设哪家便宜
  • 建筑工程素材资源网站中山做网站建设联系电话
  • 做网站关键词集团网站群建设方案
  • 网站开发有哪些课程网站开发好要租服务器吗
  • 鲜花店网站建设的规模设想网站之间的差异
  • 网站怎么在百度做推广郑州建网站
  • 机关门户网站建设顺义做网站
  • 网站开发公司东莞环球军事头条
  • 企业网站管理系统添加教程如何用python开发网页
  • 公司网站建设需要资质wordpress admin
  • 万维网网站301重定向怎么做国家城乡建设规划部网站
  • 现在的网站内容区域做多宽俄文网站开发翻译
  • 上海闵行建设局官方网站做电影网站的流程
  • 怎样做水族馆网站wordpress第三方订阅地址
  • 东莞做网站注意事项如何查网站的百度快照
  • 做资源网站需要什么郑州哪有做网站的公司
  • 不属于网站架构开发一个游戏软件多少钱
  • 电子商务网站建设 市场分析广州有哪些做网站专业的公司
  • 广州网站建设南宁厦门城健建设有限公司网站
  • 课程网站开发的研究现状网页设计制作音乐网站
  • 建设工程法律网站网站美工做专题尺寸多少?