当前位置: 首页 > news >正文

株洲企业网站建设品牌网页设计与制作课程评价

株洲企业网站建设品牌,网页设计与制作课程评价,长春网页网站制作,广告毕业设计作品网站Table of Content1. 课程2. 前置技能3. 一、数据湖概念[了解] 3.1. 1.1 企业的数据困扰 3.1.1. 困扰一#xff1a;互联网的兴起和数据孤岛3.1.2. 困扰二#xff1a;非结构化数据3.1.3. 困扰三#xff1a;保留原始数据3.1.4. 补充#xff1a;什么是结构化#xff1f; 3.1.4…Table of Content1. 课程2. 前置技能3. 一、数据湖概念[了解] 3.1. 1.1 企业的数据困扰 3.1.1. 困扰一互联网的兴起和数据孤岛3.1.2. 困扰二非结构化数据3.1.3. 困扰三保留原始数据3.1.4. 补充什么是结构化 3.1.4.1. 结构化数据3.1.4.2. 非结构化数据3.1.4.3. 半结构化数据3.2. 1.2 数据湖的提出3.3. 1.3 所以数据湖是什么3.4. 1.4 为什么叫做数据的湖3.5. 1.5 数据仓库-数据集市-数据湖区别 3.5.1. 数据湖 3.5.1.1. 数据仓库3.5.1.2. 数据集市4. 二、数据湖理论[了解] 4.1. 2.1 写时模式 VS 读时模式 4.1.1. 写时模式4.1.2. 读时模式4.2. 2.2 数据湖构建的几种常规方式 4.2.1. 方案一基于Hadoop生态体系的数据湖实施方案4.2.2. 方案二基于云平台的数据湖实施方案4.2.3. 方案三基于商业公司提供的商业数据湖产品4.3. 2.3 企业为何需要数据湖对企业有何用处【了解】4.4. 2.4 数据湖概念总结 4.4.1. 2.4.1 特点4.4.2. 2.4.2 对比数仓 4.4.2.1. 2.4.2.1 模式上4.4.2.2. 2.4.2.2 使用思维上4.4.2.3. 2.4.2.3 处理数据上4.4.3. 2.4.3 数据湖的优势4.4.4. 2.4.4 数据湖的要求4.5. 2.5 如何设计一个成功的数据湖架构 4.5.1. 2.5.1 数据湖架构的4个指导原则 4.5.1.1. 原则1 分离数据 和 业务4.5.1.2. 原则2 存储和计算的分离可选比较适用云平台4.5.1.3. 原则3 Lambda架构 VS Kappa架构 VS IOTA架构4.5.1.4. 原则4 管理服务的重要性和选择合适的工具5. 三、数据处理、数据应用的几种架构[拓展] 5.1. 3.1 Lambda架构[重点了解] 5.1.1. 3.1.1 简介5.1.2. 3.1.2 Lambda架构关键特性5.1.3. 3.1.3 数据查询的本质5.1.4. 3.1.4 Lambda的三层架构5.1.5. 3.1.5 Batch Layer 批处理层5.1.6. 3.1.6 Speed Layer 速度层5.1.7. 3.1.7 Serving layer 服务层5.1.8. 3.1.8 Lambda架构的组件选型5.1.9. 3.1.9 Lambda架构的总结5.1.10. 3.1.10 Lambda架构的缺点【拓展】5.2. 3.2 Kappa架构[了解]5.3. 3.3 IOTA架构[了解]6. 四、数据湖基于Hadoop、Spark的实现[掌握] 6.1. 4.1 数据湖的核心 6.1.1. 4.1.1 存储层6.1.2. 4.1.2 数据管理 6.1.2.1. 1. 安全6.1.2.2. 2. 审计6.1.2.3. 3. 元数据管理6.1.3. 4.1.3 数据处理7. 五、Delta Lake - 数据湖核心的增强[重点] 7.1. 5.1 什么是Delta Lake7.2. 5.2 Delta Lake 有什么特性7.3. 5.3 Delta Lake 重点特性解读 7.3.1. 中间数据 7.3.1.1. ACID 事务控制7.3.1.2. 数据版本控制7.3.1.3. 可伸缩的元数据处理7.3.1.4. 审核历史记录7.3.1.5. 统一的批处理和流处理的source 和 sink:7.4. 5.4 Delta Lake 的使用形式8. 六、Delta Lake - Quickstart[熟练] 8.1. 6.1 安装 8.1.1. 6.2 互动式 8.1.1.1. PySpark8.1.1.2. Spark Scala Shell8.1.2. 6.3 包含在工程中 8.1.2.1. Maven8.1.2.2. Scala SBT8.1.3. 6.4 创建表 8.1.3.1. 补充Python和Java的操作方式8.1.4. 6.5 读取数据 8.1.4.1. 补充Python和Java的操作方式8.1.5. 6.6 更新数据 8.1.5.1. 补充Python和Java的操作方式8.1.6. 6.7 有条件的更新而不覆盖 8.1.6.1. 补充Python和Java的操作方式8.1.7. 6.8 使用时间旅行读取旧版本的数据 8.1.7.1. 补充Python和Java的操作方式8.1.8. 6.9 事务日志9. 七、Delta Lake 操作[熟练] 9.1. 7.1 表批量读写 9.1.1. 对数据进行分区9.1.2. 追加数据9.2. 7.2 Schema验证 9.2.1. 测试修改Schema能否写入9.2.2. 如何强制执行9.2.3. 那如何让Schema中列减少呢以当前Schema强制覆盖过去9.3. 7.3 表更新、删除对Parquet数据文件的影响 9.3.1. 更新指定行9.3.2. 删除指定行9.4. 7.4 Delta Lake 表实用工具 9.4.1. Vacuum9.4.2. 历史9.4.3. 生成9.4.4. 将Delta Lake的表转换为普通的Parquet表9.4.5. 将普通的parquet表转换为Delta9.5. 7.5 Delta Lake 阶段总结9.6. 7.6 其他存储系统的配置 9.6.1. AWS - S3 9.6.1.1. 要求9.6.1.2. 快速开始9.6.1.3. S3的配置9.6.2. Microsoft Azure存储 9.6.2.1. 要求9.6.2.2. 快速开始9.6.2.3. 配置Azure Data Lake Storage Gen19.6.2.4. 配置Azure Blob存储10. 八、Delta Lake - 理论[理解] 10.1. 1. 理解Delta Lake的事务日志 10.1.1. 1.1 什么是事务日志10.1.2. 1.2 事务日志如何工作 10.1.2.1. 1.2.1 将事务分解成原子提交10.1.2.2. 1.2.2 文件级别的Delta Lake事务日志10.1.2.3. 1.2.3 使用检查点文件快速重新计算状态10.1.2.4. 1.2.4 处理多个并发读取和写入 10.1.2.4.1. 乐观并发控制10.1.2.4.2. 乐观的解决冲突10.1.3. 1.3 其他用例 10.1.3.1. 1.3.1 时间旅行10.1.3.2. 1.3.2 数据审查10.2. 2. 模式验证和演变 10.2.1. 2.1 了解表架构10.2.2. 2.2 模式验证10.2.3. 2.3 模式验证如何工作10.2.4. 2.4 模式验证有何用处10.2.5. 2.5 模式演变 10.2.5.1. 有什么用10.3. 3. Delta Lake 最佳实践 10.3.1. 3.1 选择合适的分区列10.3.2. 3.2 合并文件compact10.4. 总结11. 九、企业数据湖应用案例分析[实操一遍] 11.1. 1. 需求分析11.2. 2. 需求实现 11.2.1. 2.1 转换原始数据生成基础表11.2.2. 2.2 添加新列到基础表11.2.3. 2.3 聚合每小时的数据统计每小时TOP1011.2.4. 2.4 统计全天热门TOP10011.2.5. 2.5 将输出的数据合并为1个parquet文件11.3. 3. 总结12. 十、基于AWS的云上数据湖实现方案介绍[了解] 12.1. 1. 云平台的介绍 12.1.1. 1.1 前言12.1.2. 1.2 云平台的概念 12.1.2.1. 举个例子:12.1.3. 1.3 云平台的分类 12.1.3.1. 私有云平台12.1.3.2. 公有云平台12.1.4. 1.4 主流公有云平台12.1.5. 1.5 云的三种服务 12.1.5.1. IaaS12.1.5.2. PaaS12.1.5.3. SaaS12.1.6. 1.6 公有云对企业或者个人的意义12.2. 2. AWS的数据湖解决方案 12.2.1. 2.1 存储层[重点]12.2.2. 2.2 数据分析[重点] 12.2.2.1. Server Less的大数据分析引擎Amazon Athena12.2.2.2. 测试12.2.2.3. AWS之上的Hadoop EMR [重点]12.2.3. 2.3 数据处理ETL[了解]12.2.4. 2.4 AWS上的实时流服务[了解]12.2.5. 2.5 AWS上的数仓服务[了解]12.2.6. 2.6 AWS上的KV存储NoSQL - DynamoDB[了解]12.2.7. 2.7 数据应用[了解] 12.2.7.1. BI12.2.7.2. API服务12.2.8. 2.8 安全、审查、授权[了解]12.2.9. 2.9 AWS数据湖方案总结 1. 课程 理解数据湖的概念掌握Delta Lake 框架的应用了解在云上的数据湖实现 2. 前置技能 学习本课程需要你最少需要掌握 基本的Scala语言使用了解Spark、SparkSQL对大数据技术体系有一定的了解 如达不到前置技能的要求可能在理解上比较困难建议同学们可以先了解一下相关内容后再来学习本课程。 3. 一、数据湖概念[了解] 步骤 了解企业数据使用方面的需求了解需求催生数据湖架构数据湖和传统的数仓的简单对比 3.1. 1.1 企业的数据困扰 我们学习到这里已经接触到了如数据库、数据仓库、NoSQL数据库、消息队列、流式计算、缓存等等一系列的数据管理形式。 我们来回顾一下这些数据管理形式都分别提供了什么功能 数据库 提供数据的存储和查询 数据仓库 提供数据的集中存储的分析 NoSQL数据库也同样提供数据的存储的查询 消息队列提供数据的转移通道 流式计算提供高效的数据的加工和分析 缓存系统提供数据的快速加载 可以看到以上我们接触到的数据管理形式提供了多种多样的功能正常来说应该已经足以满足企业在数据管理和利用方面的各种需求。 但是我们仍会说企业有数据管理和利用方面的困扰。 那么这些困扰来自哪里 3.1.1. 困扰一互联网的兴起和数据孤岛 随着互联网的兴起企业内客户数据大量涌现。为了存储这些数据单个数据库已不再足够公司通常会建立多个按业务部门组织的数据库来保存数据。随着数据量的增长公司通常可能会构建数十个独立运行的业务数据库这些数据库具有不同的业务和用途 一方面这是一种福气有了更多更好的数据公司能够比以往更精确地定位客户并管理其运营。 另一方面这导致了数据孤岛整个组织中数据分散到各个地方 由于无法集中存储和利用这些数据公司对于数据的利用效率并不高。 这样的痛苦让公司逐步走向数仓的利用模式。 3.1.2. 困扰二非结构化数据 随着数据仓库的兴起人们发现数据孤岛的问题貌似被数仓解决了。 我们通过ETL、数据管道等程序从各个数据孤岛中抽取数据注入数仓中等待进行维度分析。 看起来有一种数据集中存储的样子。 但是随着互联网的加速发展数据也产生了爆发性的增长数仓就表现出来了一点力不从心 数据增长的太快而由于数据建模的严格性每开发一次数仓的新应用流程就很长。无法适应新时代对于数据快速分析、快速处理的要求 随着数据行业和大数据处理技术的发展原本被遗忘在角落中的一些价值密度低的非结构化数据便慢慢了有了其价值所在对于这些大量的非结构化数据日志、记录、报告等的分析也逐步提上日程 但是数仓并不适合去分析非结构化的数据因为数仓的严谨性其只适合处理结构化的数据。 那么对于非结构化数据的处理数仓就不太适合。 3.1.3. 困扰三保留原始数据 在以前由于大规模存储的成本和复杂性以及大数据技术尚未开始蓬勃发展等客观原因造成企业对于数据的存储是精简的。 也就是能够存入到企业系统中的数据都是经过处理提炼的这些数据撇除了价值密度低的信息只保留了和业务高度相关的核心内容。 这样可以有效的减少企业的数据容量也就减少了存储的成本、以及管理维护的复杂度。 但这样做是有一定的缺点的那就是企业并不保留原始数据或者说保留部分一旦出现数据错误或者其它问题想要从原始的数据中进行溯源就难以完成了。 并且业务并不是一成不变的当初因为业务被精简掉的内容可能对未来的业务有所帮助。 所以无法大量的长期保存原始数据也是企业的困扰之一 数据孤岛非结构化数据分析想要海量的保存原始数据 基于这3个最主要的困扰企业迫切希望能够做到 数据的集中存储解决数据孤岛并且成本可控使用维护简单可以存储任意格式的数据结构化的、非结构化的、半结构化的能够支持大多数分析框架 这样的三种最基本的需求。 那么数据湖的概念也就因这三种需求被逐步的提出并走向人们的视野中。 集中存储成本可控使用简单能够支持任意格式输入并拥有分析处理能力 3.1.4. 补充什么是结构化 那么我们刚刚提到了结构化数据和非结构化数据现在来看一下它们分别是什么。 3.1.4.1. 结构化数据 简单来说就是数据库中的数据。 在最早结构化数据也称之为 行数据。是可以由一个二维表来描述的数据。 也就是通俗的说数据是有表结构的。 不过发展到如今对于结构化的定义就不仅仅是指 数据库中的数据了。 我们可以认为可以用schema定义的数据就是结构化数据。 那么什么是schema呢 可以简单的认为schema就是表结构、或者说是一种对数据结构的描述。 结构化数据是规范的在schema的定义下每一列每一个位置应该是什么类型的数据表达的是什么意义都是确定的。 对于这样的具有确定意义的数据的分析的处理是极为方便的。 name Stringage Intaddress String张三18北京市 目前常见的结构化数据有 数据库中的数据 遵循schema的特定分隔符数据如CSV 等 3.1.4.2. 非结构化数据 那么非结构化数据就应该比较好理解了 非结构化数据就是指无法用schema定义的数据 非结构化数据是非常多的如 我们写程序的代码程序的日志输出各类协议等这些还只是文本如果算上二进制 图片音频视频PPT等都算得上非结构化数据 3.1.4.3. 半结构化数据 其实还有一个类别叫做半结构化数据 这个也很好理解就是指可以用schema定义其一部分的数据但无法定义全部 或者说无法用二维表比如数据库表结构定义的数据描述但是其自身有相应的标记或描述语言对自身进行数据描述的数据其遵循自身所带有的描述或标记规定的结构 常见的半结构化数据有 XML JSON YAML ini 等 以JSON举例如果我们用schema来描述它只能得知如 { “Key”“Value” ......Key: Value } 但是我们无法详细的知道 key是什么特别是value是什么。 因为value可以是一个 字符串可以是数字可以是嵌套的另一个JSON对象或者嵌套了另一个JSON数组都有可能。 所以JSON无法用schema来完整描述但是JSON自身有描述。 也就是我们无法在得到JSON之前用schema来定义它但是当我们得到JSON后就可以用schema来定义它了。 如一串JSON {name: zhangsan, age: 10, like: [football, music]}在拿到这个json前我们不知道其schema是什么但是当我们得到后就可以定义其schema {string: string, string: int, string: Array[String]}但是这个schema 依旧无法和真正结构化数据的schema来进行比较其描述能力是有所减弱的 为何 如上我们只能得知schema中某个位置是string, 某个位置是int 但是在真正的结构化schema中我们是能明确某个位置就是表示的 名字并且是string类型。 因为 {title: developer, level: 10, like: [football, music]}这个json依旧满足上面的schema 所以半结构化数据可能有点不好理解 但也是我们经常接触到的数据。 3.2. 1.2 数据湖的提出 在2011年左右开源大数据技术Hadoop逐步进入企业的视野也开始了蓬勃发展。 大数据技术带来了 海量数据分析的可能性低成本、易维护管理的分布式存储 与此同时基于大数据技术带来的优势以及企业对数据的困扰所产生的需求在2011年数据湖的概念也被提出 在概念中提到数据湖应该做到 集中存储保留原始数据格式支持任意格式支持海量数据分析 以上的诉求大数据技术体系均可以满足。 此时企业开始走向构建数据湖的时代。 数据湖的提出是基于大数据技术的发展如果没有大数据技术数据湖的概念很难被落实。 3.3. 1.3 所以数据湖是什么 根据前面的内容我们可以得出数据湖就是 一种支持任意数据格式、并保留原始数据内容的 大规模存储系统架构并且其支持海量数据的分析处理。 大规模存储系统架构 支持任意数据格式的输入并做到集中存储能够保留海量的原始数据支持海量数据分析处理 3.4. 1.4 为什么叫做数据的湖 我们知道IT技术的命名有时候是和其本身关系不大的比如Hadoop、Pig、Spark等。 有时候看名字我们就知道其是做什么的比如Flume、Zookeeper等。 数据湖的命名Data Lake就是第二种名字贴合其实际意义的。 为什么是湖泊呢 我们前面说过数据湖应该做到 集中存储支持任意数据格式输入等 那么这样的要求是不是很像无论大小河流任意格式均可将水汇入湖泊中集中存储。 转存失败重新上传取消 所以从名字中我们可以解析到数据湖就是一个巨大的数据集合汇聚了来自各个系统的任意格式的原始数据并且能够对湖泊进行利用分析进行水的流出分析、利用的结果 3.5. 1.5 数据仓库-数据集市-数据湖区别 我们应该听说过以下3个概念 数据仓库数据集市数据湖 那么这三者到底有什么区别呢 我们一一看一下 3.5.1. 数据湖 是整个公司内的一个开放的数据中心接收任意类型的数据输入对数据进行集中存储并能对这些数据提供分析服务。 3.5.1.1. 数据仓库 是整个公司的业务数据集合主要针对结构化的业务数据并能提供查询分析服务。 3.5.1.2. 数据集市 是一个小型的部门级别或者工作组级别的数仓。其内部数据主要针对指定业务范围或者为指定人员提供服务。 比较数据仓库数据集市数据湖应用范围全公司部门或工作组全公司数据类型结构化数据处理结构化数据处理任意格式数据处理存储规模大量中等规模小型数仓海量数据应用维度建模、指标分析小范围数据分析海量任意格式分析、不限应用的类型新应用开发周期长长短 数据湖和数仓不是互相 4. 二、数据湖理论 学习步骤 两种数据写入模式构建数据湖的几个常规方式数据湖对企业的用途和数据湖的设计原则 4.1. 2.1 写时模式 VS 读时模式 为了更好的理解数据湖我们先了解一下 写时模式读时模式 这两种模式。 4.1.1. 写时模式 数据在写入之前就需要定义好数据的schema数据按照schema的定义写入 4.1.2. 读时模式 数据在写入的时候不需要定义Schema在需要使用的时候在使用Schema定义它 写时模式和读时模式是两种截然不同的数据处理方法。 我们前面学习的如数据库、数据仓库、数据集市 或者具体的一些框架如MysqlRedis HBase等均是写时模式即数据在写入之前就需要预先有Schema定义好才可以。 而数据湖就是一种读时模式思想的具体体现 相比较写时模式而言读时模式因为是数据在使用到的时候再定义模型结构Schema因此能够提高数据模型定义的灵活性可以满足不同上层业务的高效率分析需求。 因为对于写时模式而言如果想要事后更改Schema是有很高的成本的。 而读时模式可以在用的时候再定义Schema就很灵活了同一套数据可以用不同的Schema来定义来获取不同的效果。 4.2. 2.2 数据湖构建的几种常规方式 想必同学们学习到这里应该会有一定的疑惑就是 数据湖是一种新型的数据库吗还是一种新推出的技术框架吗 答案是 No 我们前面给数据湖一个定义就是 数据湖是一种支持任意数据格式、并保留原始数据内容的 大规模存储系统架构并且其支持海量数据的分析处理。 那么根据定义可以看出数据湖是一种系统的架构方案它并不是一种特殊的数据库也不是某一种技术框架。数据湖是一种概念一种解决问题的思路一种数据治理的方案、一种企业大规模数据集中存储并利用的架构思想 那么数据湖架构是怎么实现的呢 4.2.1. 方案一基于Hadoop生态体系的数据湖实施方案 实际上多数企业对于Hadoop生态的使用本质上是一种数据湖思想的体现。 如企业中会使用 HDFS来作为存储层存储各类各样的原始数据不管是结构的、半结构的、还是非结构的均在HDFS存储。使用Spark、SparkSQL、MR等计算框架作为分析引擎对原始数据进行分析、抽取、计算、利用。使用Flume、Kafka等持续不断的为HDFS落地新数据使用Flink、Storm等实时分析HDFS的数据以及落地结果至HDFS之上。等等。 实际上以上的解决方案或者说数据架构就是数据湖的思想。 我们在来回想一下 以上的技术利用是不是满足了 无论何种数据均可落地存储HDFS无论何种数据均可分析Spark、MR 所以说我们的结论就是 数据湖本质上就是要为企业构建一个数据治理方案方案可以满足无论何种数据结构、半结构、非结构均可集中存储并能够提供分析服务并且能够支撑海量的数据。 另外集中存储也是一个很重要的概念。数据湖的思想是数据原始数据均集中存储起来在需要的时候可以快速抽取进行计算避免这里存一份那里存一份。集中存储集中利用。 HDFS作为底层存储层在企业业务系统中一般是均可访问到的视权限管控的具体情况那么对于企业来说HDFS无处不在在任何需要数据的时候均随时从HDFS中抽取即可。 可以直接让Spark、SparkSQL读时模式后定义Schema去分析可以直接将数据扔给AI集群去做训练可以直接走ETL过程将数据扔入数仓等等。 那么以这种结论来看多数企业均在使用数据湖这样的思想去治理数据。 我们说这是一种思想一种数据治理的方式。对于能够理解并利用数据湖思想的企业其在架构Hadoop生态体系的时候会按照数据湖的思想来构建、架构其数据中心平台。 对于并未想到数据湖或者说不了解数据湖的企业来说其Hadoop生态的体系在架构设计的时候多数是围绕其具体业务来设计的只是多数的架构也恰恰满足数据湖的概念定义。 HDFS 做集中存储能够支持海量的数据以及不限格式的存储输入Spark、SparkSQL MR 可以对这些海量的数据进行分析。 4.2.2. 方案二基于云平台的数据湖实施方案 通过方案一的讲解我们应该明白数据湖不是技术框架而是数据治理的方案这句话的意思了。 那么在云平台上基于云平台提供的技术架构和具体组件来协助构建企业的数据湖实施方案也是一种可行并高效的方式。 集中的海量存储海量的数据分析 我们以AWS为例 以S3对象存储服务为核心提供数据湖的存储层做到集中存储随处访问视权限管控结果以DynamoDBAmazon ES等服务提供元数据存储和查询Schema存储以Firehose、Snowball等服务提供数据导入功能以Athena、EMR、Redshift等服务提供数据的处理和分析功能以STS、Cloudwatch、IAM、API Gatewa等服务提供数据中心的安全、认证、访问、用户接口等功能 可以看出其实和Hadoop生态体系的数据湖差不多也是由一个核心的存储层提供集中存储HDFS、S3然后由一系列计算引擎提供分析计算Spark、EMR并由一系列其它辅助工具提供额外功能如数据导入、权限管控、元数据存储等。 4.2.3. 方案三基于商业公司提供的商业数据湖产品 部分公司选择使用相关商业产品收费来构建企业的数据湖生态。如Zaloni 等。 商业公司的商业产品一般均为闭源实现且价格不菲多数为大型企业以及相关传统企业选用。 主要是花钱买服务一般许多传统行业非互联网等科技企业的大型公司愿意选用因为本身并没太多的技术人员但又有相关需求比较倾向花钱一套解决。 商业产品这里不多做介绍我们主要关注于Hadoop生态和云平台相关。 4.3. 2.3 企业为何需要数据湖对企业有何用处【了解】 我们先来看一下企业中数据仓库的开发流程。 一般我们如果想要开发一个新的数仓应用其开发流程是 提出数仓应用的需求需要某某某报表指标分析根据需求设计数仓的模型和表结构设计完成后编码应用ETL等工具完成数据的输入 可以发现数仓的开发是倒序进行的是以需求为导向的。 这是写时模式的一种体现并且在前期进行需求分析、模型设计、项目编码等一系列操作是传统的应用开发模式比较耗时并繁琐。 在传统的过去这种开发形式没有太多问题但是在数据大规模增长的今天如果企业100%依赖数仓这种模式来进行数据的价值提炼那么企业就很可能跟不上时代发展的步伐。 所以数据湖的价值就体现了出来除了为企业解决我们先前提到的三个困扰以外还有一点对企业很有价值的就是 基于数据湖的开发模式是一种读时模式是一种灵活的、快速的数据处理思路可以快速的对以后数据进行数据分析并让其立刻产生价值。 并且重要的是它能在数字化的新浪潮下真正的帮助企业完成技术转型、完成数据积累、完成高效的数据治理应对快速发展的商业环境下层出不穷的新问题。 要注意的是并不是说有了数据湖之后数仓就是没用的了。并不是这样。 数据湖和数仓是一种互补的存在数据湖基于其集中存储、保留原始格式、读时模式等特点为企业提供了快速挖掘数据价值的能力以及提高数据利用率让每一份数据都发挥其存在的价值。 而数仓为企业提供的是 更加严格的商业数据分析价值密度更高的数据分析针对业务进行的精准数据处理 所以在当下的企业数据湖有其存在的价值数仓同样。 两者是互补的关系合力为企业创造更好的数据价值。 4.4. 2.4 数据湖概念总结 寥寥草草的说了这么多我们来总结一下数据湖的一些特点 4.4.1. 2.4.1 特点 不限格式来之不拒均可流入 当前的时候数据增长巨大、数据来源也是各种各样不管是结构的、半结构的、还是非结构的都可以流入数据湖做集中存储方便利用的时候进行分析 集中存储、到处可访问 数据集中存储起来Hadoop生态使用HDFS、云平台使用S3、OSS等在需要的时候随时进行访问避免了在一些模式下许多业务的数据均分散存储这里一部分那里一部分需要做许多前置工作才能将数据汇总聚合。 高性能分析能力 借助于Spark、MR、SparkSQL等高性能分析计算引擎可以对海量的数据进行分析 原始数据存储 大量的保留原始数据让每一个字段每一段信息都发挥其价值并更好的为企业提供数据溯源、数据修复等一系列功能。 4.4.2. 2.4.2 对比数仓 前面我们简单的对比了一下数据湖、数仓、数据集市。 随着我们对数据湖概念了解的加深我们再次对比一下数仓 4.4.2.1. 2.4.2.1 模式上 数仓 写时模式数据写入前已经定义好Schema更改Schema成本较高 数据湖读时模式数据在利用的时候再定义Schema灵活方便典型例子SparkSQL 基于SparkSQL的后定义Schema读时模式目前多数数据湖的实现方案里面SparkSQL占了很大的份额。 4.4.2.2. 2.4.2.2 使用思维上 数仓先有报表需求根据需求确定数仓Schema然后通过ETL过程将数据导入。也就是先有需求、后准备数据 数据湖并不需要根据需求来开发数据业务。数据集中存储需要的时候再利用。也就是先有数据再根据已有数据开发业务。 这样的方式对比数仓好在 可以完整的保留数据的结构不会因为ETL过程损失数据信息可以加快数据开发的进度适应企业不断增长的业务需求 4.4.2.3. 2.4.2.3 处理数据上 数仓 只针对结构化数据、或部分有严格格式的半结构化数据 数据湖接受任何数据输入 4.4.3. 2.4.3 数据湖的优势 轻松的收集数据读时模式数据湖与数据仓库的一大区别就是Schema On Read即在使用数据时才需要Schema信息而数据仓库是Schema On Write即在存储数据时就需要设计好Schema。这样由于对数据写入没有限制数据湖可以更容易的收集数据。不需要关心数据结构存储数据无限制任意格式数据均可存储只要你能分析就能存。全部数据都是共享的集中存储多个业务单元或者研究人员可以使用全部的数据以前由于一些数据分布于不同的系统上聚合汇总数据是很麻烦的。从数据中发掘更多价值分析能力数据仓库和数据市场由于只使用数据中的部分属性所以只能回答一些事先定义好的问题而数据湖存储所有最原始、最细节的数据所以可以回答更多的问题。并且数据湖允许组织中的各种角色通过自助分析工具MR、Spark、SparkSQL等对数据进行分析以及利用AI、机器学习的技术从数据中发掘更多的价值。具有更好的扩展性和敏捷性数据湖可以利用分布式文件系统来存储数据因此具有很高的扩展能力。开源技术的使用还降低了存储成本。数据湖的结构没那么严格因此天生具有更高的灵活性从而提高了敏捷性。 4.4.4. 2.4.4 数据湖的要求 想必大家应该对数据湖有了清晰的认知了那么为了满足我们需要的 集中存储任意格式输入强大的分析能力 我们需要对数据湖的实现提出如下的要求 安全数据集中存储就对数据安全有了更高的要求对权限的管控要求更加严格。可拓展的随着业务扩张、数据增多要求数据湖体系可以随需求扩展其能力。可靠的作为一个集中存储的数据中心可靠性也很重要三天两头坏掉那是不可以的。吞吐量数据湖作为海量数据的存储对数据的吞吐量要求就必须很高。原有格式存储数据湖我们定义为 所有数据的原始数据集中存储库那么存储进入数据湖的数据就是未经修饰的、原始的数据支持多种数据源的输入不限制数据类型任意数据可以写入多分析框架的支持因为数据格式各种各样并不全是结构化数据所以要求支持多种分析框架对数据湖中的数据进行提取、分析。包括但不限于批处理的、实时的、流的、机器学习的、图形计算的等等。 4.5. 2.5 如何设计一个成功的数据湖架构 那么我们如何才能设计出一个成熟的、成功的数据湖的体系架构呢 要满足一个成功的数据湖架构除了要满足在前面2.4.4 数据湖的要求提到的要求外也要满足下面的4个设计指导原则 4.5.1. 2.5.1 数据湖架构的4个指导原则 数据湖在架构的时候遵循如下4个设计指导原则 4.5.1.1. 原则1 分离数据 和 业务 在数据湖的架构设计中不考虑业务只考虑数据。 也就是我们只站在数据的层面去考虑如何去高效的写入、如何实现可用的集中存储 而不会在这个过程中考虑业务为业务对数据做适配。这些考虑是数仓应该做的。 很多企业也做到了所有数据均存储但是仍旧不能称之为数据湖就是因为很多企业在存储的时候并不能完全舍弃业务只关心数据层面。 比如有的企业认为我的数仓里面存储了全部的企业需要的数据为何不是数据湖其实就是因为在做存储的时候并没有完全的抛弃业务总是因为业务需求对数据进行了拉伸、缩减等处理。 或者有的企业也是将所有数据均存入HDFS但是在存储的时候根据业务需要对数据进行了修饰。 那么这样的操作都不能算构建了数据湖因为数据湖的要求其中之一就是原始数据未经修改的存储存储的是原滋原味的数据而不是modified的数据。 4.5.1.2. 原则2 存储和计算的分离可选比较适用云平台 有这样一个问题当计算容量的不够的时候我们需要对计算进行扩容但是一般的Hadoop使用中计算和存储是在一起的。datanode 和 计算节点复用为了做数据本地计算 对计算进行扩容就会导致存储也一样扩容那么存储的rebalance就会造成存储的资源消耗。 也就是说白了计算的扩容受到存储的制约无法灵活的扩容\缩容。 所以最好的情况就是做计算和存储的分离。 存储是存储计算是计算。 但对于传统的Hadoop集群来说做分离的话就对网络环境要求极高因为当数据无法在本地计算的时候就需要走网络传输。 那么交换机等内网性能就会是很大的挑战。 所以对于多数受到成本制约的公司来说存储和计算的分离是可选的原则因为其成本较高。 但是对于云平台来说就没有这方面的顾虑。 云平台基本上都是天生的计算和存储分离的。 如AWS的S3 作为存储其计算是和S3没有任何关联的。 如Azure的Blob存储其计算也和Blob无关 同样如阿里云的OSS存储也是和计算没有关系的。 所以如果要在云平台上实现数据湖那么一个天生的优势就是计算和存储很容易就分离了。 4.5.1.3. 原则3 Lambda架构 VS Kappa架构 VS IOTA架构 数据湖构建好后数据总要被利用、被分析。 而一个好的数据利用的架构可以高效的去处理数据湖内的海量数据。 数据处理的架构一般有 Lambda架构、Kappa架构、IOTA架构等。 关于这些架构请参阅后面的拓展章节内的介绍。 注这些架构并不属于数据湖架构而是指我们有了数据湖怎么去利用、去分析数据湖内数据的一些架构。 这些也是常见的大数据分析领域的架构。 4.5.1.4. 原则4 管理服务的重要性和选择合适的工具 数据湖不仅仅是一个存储那么简单存储是为了利用为了达到这一点我们需要 对数据进行安全管理对访问进行权限管控需要ETL等将数据汇入数据湖需要使用合适批处理、流处理去分析去计算用户前端工具如BI展示、REST API等等等一些列周边围绕的服务 也就是实现一个数据湖需要许多服务的协同配合不仅仅是存储那么简单所以这些管理服务以及相关的辅助工具对于数据湖来说是很重要的。 那么适当的管理服务可以帮助我们更加简便的设计数据湖的架构。 如上是数据湖架构的4个指导原则一般来说满足这4个指导原则就能构建出成功的数据湖架构。 5. 三、数据处理、数据应用的几种架构[拓展] 学习步骤 了解Lambda架构 本章为拓展章节课堂上仅做简单介绍同学们可以自行阅读理解或查阅互联网资料。 我们在前面说过数据湖内的数据在利用的时候 一般会遵循Lambda架构或者Kappa架构或IOTA架构等数据处理的架构思想为指导。 当然不遵循这两种架构思想也是可以的如果你有自己的想法去做设计也是没问题的。 只是一般Lambda架构和Kappa架构作为成熟的大数据分析架构用在处理数据湖内的数据也是很适合的。 5.1. 3.1 Lambda架构[重点了解] 下面下来看一段官方语气的介绍 5.1.1. 3.1.1 简介 Lambda架构是由Storm的作者Nathan Marz提出的一个实时大数据处理架构。Marz在Twitter工作期间开发了著名的实时大数据处理框架StormLambda架构是其根据多年进行分布式大数据系统的经验总结提炼而成。 Lambda架构的目标是设计出一个能满足实时大数据系统关键特性的架构包括有高容错、低延时和可扩展等。Lambda架构整合离线计算和实时计算融合不可变性Immunability读写分离和复杂性隔离等一系列架构原则可集成HadoopKafkaStormSparkHbase等各类大数据组件。 5.1.2. 3.1.2 Lambda架构关键特性 Marz认为大数据系统应具有以下的关键特性 Robust and fault-tolerant容错性和鲁棒性注1)对大规模分布式系统来说机器是不可靠的可能会宕机但是系统需要是健壮、行为正确的即使是遇到机器错误。除了机器错误人更可能会犯错误。在软件开发中难免会有一些Bug系统必须对有Bug的程序写入的错误数据有足够的适应能力所以比机器容错性更加重要的容错性是人为操作容错性。对于大规模的分布式系统来说人和机器的错误每天都可能会发生如何应对人和机器的错误让系统能够从错误中快速恢复尤其重要。Low latency reads and updates低延时很多应用对于读和写操作的延时要求非常高要求对更新和查询的响应是低延时的。Scalable横向扩容当数据量/负载增大时可扩展性的系统通过增加更多的机器资源来维持性能。也就是常说的系统需要线性可扩展通常采用scale out通过增加机器的个数而不是scale up通过增强机器的性能。General通用性系统需要能够适应广泛的应用包括金融领域、社交网络、电子商务数据分析等。Extensible可扩展需要增加新功能、新特性时可扩展的系统能以最小的开发代价来增加新功能。Allows ad hoc queries方便查询数据中蕴含有价值需要能够方便、快速的查询出所需要的数据。Minimal maintenance易于维护系统要想做到易于维护其关键是控制其复杂性越是复杂的系统越容易出错、越难维护。Debuggable易调试当出问题时系统需要有足够的信息来调试错误找到问题的根源。其关键是能够追根溯源到每个数据生成点。 注解1鲁棒是Robust的音译也就是健壮和强壮的意思。它也是在异常和危险情况下系统生存的能力。 5.1.3. 3.1.3 数据查询的本质 查询是什么概念Marz给出了一个简单的定义 Query Function(All data)该等式的含义是查询是应用于数据集上的函数。该定义看似简单却几乎囊括了数据库和数据系统的所有领域RDBMS、索引、OLAP、OLTP、MapReduce、EFL、分布式文件系统、NoSQL等都可以用这个等式来表示。 5.1.4. 3.1.4 Lambda的三层架构 有了上面对查询的定义下面我们来讨论大数据系统的关键问题 如何实时地在任意大数据集上进行查询 大数据再加上实时计算问题的难度比较大。最简单的方法就是根据前面的定义Query Function(All data) 在全体数据集上运行函数得到想要的结果。但是如果数据量非常大该计算的方式就代价太大了所以不现实。 那么Lambda架构通过分解为三层架构来解决此问题 Batch Layer批处理层Speed Layer速度层Serving Layer服务层 转存失败重新上传取消 那么这三层表达了什么意思我们一个一个来看一下。 5.1.5. 3.1.5 Batch Layer 批处理层 我们前面说过一般情况下任何的查询都可以表示为Query Function(All data)但是如果在数据量非常大的时候且还要支持实时查询就会消耗巨大的系统资源或者难以达到。 那么一个解决方式就是预运算查询函数precomputed query function) 预运算查询函数可以在系统空闲的时候根据业务需要的设计去运行查询分析作业然后生成结果我们称之为Batch view 批视图 那么有了这个Batch view后我们对数据的查询可以改为 Batch view Query Function(All data) Query Function(Batch view)从表达式中我们可以看出真正的业务查询实际上是查询的 批处理视图也即是我们预先准备好的数据内容。 我们也可以把这一步称之为中间数据生成 那么在Lambda架构中把Batch view的生成这一步就称之为Batch Layer 批处理层。 在Batch Layer中有两个特性 存储Master Dataset 这是一个持续增长的数据集对应All data也就是在数据湖中我们需要利用的数据集在Master Dataset上执行预计算函数构建查询所需的对应的view 我们把预处理结果称之为view通过view可以快速得到结果或者说对比query All data 简单太多 转存失败重新上传取消 可以看出预计算函数本质上就是一个批处理那么就比较适合使用MR、Spark等计算引擎进行处理。 并且采用这种形式生成的view均支持再次计算如果对view不满意或者执行错误重新执行一次即可。 该工作看似简单实质非常强大。任何人为或机器发生的错误都可以通过修正错误后重新计算来恢复得到正确结果。 5.1.6. 3.1.6 Speed Layer 速度层 Batch Layer可以很好的处理离线数据但是在我们的系统中有许多的实时增量数据而Speed Layer这一层就是用来处理实时增量数据的。 Speed Layer和Batch Layer比较类似Speed Layer层是对数据进行计算并生成一个Realtime View其主要区别在于 Speed Layer处理的数据是最近的增量数据流Batch Layer处理的全体数据集Speed Layer为了效率接收到新数据时不断更新Realtime View而Batch Layer根据全体离线数据集直接得到Batch View。Speed Layer是一种增量计算而非重新计算recomputationSpeed Layer因为采用增量计算所以延迟小而Batch Layer是全数据集的计算耗时比较长 综上所诉Speed Layer是Batch Layer在实时性上的一个补充。Speed Layer可总结为 realtime viewfunction(realtime viewnew data) Lambda架构将数据处理分解为Batch Layer和Speed Layer有如下优点 容错性。Speed Layer中处理的数据也不断写入Batch Layer当Batch Layer中重新计算的数据集包含Speed Layer处理的数据集后当前的Realtime View就可以丢弃这也就意味着Speed Layer处理中引入的错误在Batch Layer重新计算时都可以得到修正。这点也可以看成是CAP理论中的最终一致性Eventual Consistency的体现。复杂性隔离。Batch Layer处理的是离线数据可以很好的掌控。Speed Layer采用增量算法处理实时数据复杂性比Batch Layer要高很多。通过分开Batch Layer和Speed Layer把复杂性隔离到Speed Layer可以很好的提高整个系统的鲁棒性和可靠性。 通俗的说也就是Batch Layer根据其执行的时间间隔不断的将Batch View 涵盖的范围覆盖到新数据上。 而一旦Batch View执行到了某个时间点这个时间点之前的Realtime View就会被丢弃。 也就是说由于Batch View的高延迟在需要得到最新数据结果的时候由Realtime View做补充然后再后方Batch View不断的追赶进度。 5.1.7. 3.1.7 Serving layer 服务层 Lambda架构中的最高层Serving Layer层可以理解为用户层即响应用户的查询需求的层。 在Serving Layer中将合并Batch View 以及Realtime View的结果作为最终的View提供给用户查询。 那么换算为前面的表达式为Query Function(Batch View Realtime View) 上面分别讨论了Lambda架构的三层Batch LayerSpeed Layer和Serving Layer。总结下来Lambda架构就是如下的三个等式 batch view function(all data) realtime view function(realtime view, new data) # 其中参数中的Realtime view就是不断的对以后的Realtime View进行迭代更新直到被Batch View追上丢弃。 query function(batch view, realtime view)5.1.8. 3.1.8 Lambda架构的组件选型 根据上面对Lambda架构的理解我们可以对各个层的实现来做技术选型 Batch Layer 可以选用MR、Spark、SparkSQL等计算引擎Speed Layer 可以选用Storm、Flink、Spark StreamingServing Layer可以选用Mysql、Redis、HBase等数据库或缓存系统供用户查询将两个View的合并结果导入供查询 5.1.9. 3.1.9 Lambda架构的总结 Lambda架构可以总结为以下一些简单的语言 分为离线处理路径和实时处理路径两种处理模式离线处理和实时处理都会产生相应的中间数据离线的结果根据执行间隔不停的更新实时的结果不断的用新数据迭代。将离线的实时生成的中间数据进行合并抽取到一些数据库、缓存系统中作为服务层供用户查询。 5.1.10. 3.1.10 Lambda架构的缺点【拓展】 Lambda架构经过这么多年的发展已经非常的成熟其优点是稳定对于实时计算部分的成本可控而批处理部分可以利用晚上等空闲时间进行计算。这样把实时计算和离线处理的高峰错开来。 这种架构支撑了数据行业的早期发展但也有一些缺点 实时和批量结果不一致引起的冲突由架构中可以得知架构分实时和离线两部分两边结果的计算要保持一致就比较困难。理论上来说对于一些需要全量数据才能计算出的结果90%的数据计算已经由离线负责完成剩下10%是当前实时的计算结果对两个结果合并就能做到100%全量的处理并且保证低延迟。 但是这仅仅是理论上以及我们所期望达到的实际在应用的过程中因为各种原因导致这个时间没有对的上导致衔接处出现了一些数据遗漏或者数据重复就会让结果不准确。 并且当过了一段时间后离线部分追了上来对错误进行了修正又会导致在前端页面导致结果被修改的问题。 也就是说理论是OK的实施起来比较复杂难免出现问题对技术团队的能力有要求 批量计算无法在时限内计算完成在IOT时代数据量越来越多很多时候的凌晨空闲期有的时候都不够用了有的计算作业甚至会计算到大中午才结束这样的话离线部分就大大了落后进度了这导致实时的压力越来越大其不断递归迭代的更新数据view越来越困难。 开发和维护的问题由于要在两个不同的流程中对数据进行处理那么针对一个业务就产生了两个代码库一个离线计算、一个实时计算那么这样的话会让系统的维护更加困难。 服务器存储开销大由于View也就是中间数据的存在会导致计算出许多的中间数据用来支撑业务这样会加大存储的压力。ps: 目前存储的成本越来越低这个问题越来越不重要了 也即是由于Lambda架构的这些局限性Kappa架构应运而生它比Lambda架构更加的灵活我们在下面来看一下Kappa架构的相关细节。 5.2. 3.2 Kappa架构[了解] 针对Lambda架构的需要维护两套程序等以上缺点LinkedIn的Jay Kreps结合实际经验和个人体会提出了Kappa架构。 Kappa架构的核心思想是通过改进流计算系统来解决数据全量处理的问题使得实时计算和批处理过程使用同一套代码。 此外Kappa架构认为只有在有必要的时候才会对历史数据进行重复计算而如果需要重复计算时Kappa架构下可以启动很多个实例进行重复计算。 一个典型的Kappa架构如下图所示 转存失败重新上传取消 Kappa架构的核心思想包括以下三点 1.用Kafka或者类似MQ队列系统收集各种各样的数据你需要几天的数据量就保存几天。 2.当需要全量重新计算时重新起一个流计算实例从头开始读取数据进行处理并输出到一个新的结果存储中。 3.当新的实例做完后停止老的流计算实例并把老的一些结果删除。 Kappa架构的优点在于将实时和离线代码统一起来方便维护而且统一了数据口径的问题。而Kappa的缺点也很明显 ● 流式处理对于历史数据的高吞吐量力不从心所有的数据都通过流式计算即便通过加大并发实例数亦很难适应IOT时代对数据查询响应的即时性要求。 ● 开发周期长此外Kappa架构下由于采集的数据格式的不统一每次都需要开发不同的Streaming程序导致开发周期长。 ● 服务器成本浪费Kappa架构的核心原理依赖于外部高性能存储redis,hbase服务。但是这2种系统组件又并非设计来满足全量数据存储设计对服务器成本严重浪费。 lambda 架构kappa 架构数据处理能力可处理超大规模的历史数据历史数据处理能力有限机器开销批处理和实时计算需一直运行机器开销大必要时进行全量计算机器开销相对较小存储开销只需要保存一份查询结果存储开销较小需要存储新老实例结果存储开销相对较大。但如果是多 Job 共用的集群则只需要预留出一小部分的存储即可开发、测试难易程度实现两套代码开发、测试难度较大只需面对一个框架开发、测试难度相对较小运维成本维护两套系统运维成本大只需维护一个框架运维成本小 5.3. 3.3 IOTA架构[了解] 而在IOT大潮下智能手机、PC、智能硬件设备的计算能力越来越强而业务需求要求数据实时响应需求能力也越来越强过去传统的中心化、非实时化数据处理的思路已经不适应现在的大数据分析需求提出新一代的大数据IOTA架构来解决上述问题 整体思路是设定标准数据模型通过边缘计算技术把所有的计算过程分散在数据产生、计算和查询过程当中以统一的数据模型贯穿始终从而提高整体的预算效率同时满足即时计算的需要可以使用各种Ad-hoc Query(即席查询)来查询底层数据 转存失败重新上传取消 IOTA整体技术结构分为几部分 ● Common Data Model贯穿整体业务始终的数据模型这个模型是整个业务的核心要保持SDK、cache、历史数据、查询引擎保持一致。对于用户数据分析来讲可以定义为“主-谓-宾”或者“对象-事件”这样的抽象模型来满足各种各样的查询。以大家熟悉的APP用户模型为例用“主-谓-宾”模型描述就是“X用户 – 事件1 – A页面(2018/4/11 20:00) ”。当然根据业务需求的不同也可以使用“产品-事件”、“地点-时间”模型等等。模型本身也可以根据协议(例如 protobuf)来实现SDK端定义中央存储的方式。此处核心是从SDK到存储到处理是统一的一个Common Data Model。 ● Edge SDKs Edge Servers这是数据的采集端不仅仅是过去的简单的SDK在复杂的计算情况下会赋予SDK更复杂的计算在设备端就转化为形成统一的数据模型来进行传送。例如对于智能Wi-Fi采集的数据从AC端就变为“X用户的MAC 地址-出现- A楼层(2018/4/11 18:00)”这种主-谓-宾结构对于摄像头会通过Edge AI Server转化成为“X的Face特征- 进入- A火车站(2018/4/11 20:00)”。也可以是上面提到的简单的APP或者页面级别的“X用户 – 事件1 – A页面(2018/4/11 20:00) ”对于APP和H5页面来讲没有计算工作量只要求埋点格式即可。 ● RealTime Data实时数据缓存区这部分是为了达到实时计算的目的海量数据接收不可能海量实时入历史数据库那样会出现建立索引延迟、历史数据碎片文件等问题。因此有一个实时数据缓存区来存储最近几分钟或者几秒钟的数据。这块可以使用Kudu或者Hbase等组件来实现。这部分数据会通过Dumper来合并到历史数据当中。此处的数据模型和SDK端数据模型是保持一致的都是Common Data Model例如“主-谓-宾”模型。 ● Historical Data历史数据沉浸区这部分是保存了大量的历史数据为了实现Ad-hoc查询将自动建立相关索引提高整体历史数据查询效率从而实现秒级复杂查询百亿条数据的反馈。例如可以使用HDFS存储历史数据此处的数据模型依然SDK端数据模型是保持一致的Common Data Model。 ● DumperDumper的主要工作就是把最近几秒或者几分钟的实时数据根据汇聚规则、建立索引存储到历史存储结构当中可以使用map-reduce、C、Scala来撰写把相关的数据从Realtime Data区写入Historical Data区。 ● Query Engine查询引擎提供统一的对外查询接口和协议(例如SQL JDBC)把Realtime Data和Historical Data合并到一起查询从而实现对于数据实时的Ad-hoc查询。例如常见的计算引擎可以使用presto、impala、clickhouse等。 ● Realtime model feedback通过Edge computing技术在边缘端有更多的交互可以做可以通过在Realtime Data去设定规则来对Edge SDK端进行控制例如数据上传的频次降低、语音控制的迅速反馈某些条件和规则的触发等等。简单的事件处理将通过本地的IOT端完成例如嫌疑犯的识别现在已经有很多摄像头本身带有此功能。 IOTA大数据架构主要有如下几个特点 ● 去ETL化ETL和相关开发一直是大数据处理的痛点IOTA架构通过Common Data Model的设计专注在某一个具体领域的数据计算从而可以从SDK端开始计算中央端只做采集、建立索引和查询提高整体数据分析的效率。 ● Ad-hoc即时查询鉴于整体的计算流程机制在手机端、智能IOT事件发生之时就可以直接传送到云端进入realtime data区可以被前端的Query Engine来查询。此时用户可以使用各种各样的查询直接查到前几秒发生的事件而不用在等待ETL或者Streaming的数据研发和处理。 ● 边缘计算(Edge-Computing)将过去统一到中央进行整体计算分散到数据产生、存储和查询端数据产生既符合Common Data Model。同时也给与Realtime model feedback让客户端传送数据的同时马上进行反馈而不需要所有事件都要到中央端处理之后再进行下发。 转存失败重新上传取消 如上图IOTA架构有各种各样的实现方法为了验证IOTA架构很多公司也自主设计并实现了“秒算”引擎 在大数据3.0时代Lambda大数据架构已经无法满足企业用户日常大数据分析和精益运营的需要去ETL化的IOTA大数据架构也许才是未来。 6. 四、数据湖基于Hadoop、Spark的实现[掌握] 学习步骤 一般企业基于Hadoop、Spark是如何构建数据湖的数据湖的核心是什么 我们前面花费了3个章节来全面赘述了数据湖的相关概念和理论内容着实是有些臭长。 但是这些概念却不能够省略掉。 数据湖从提出到现在也不过7、8年的时间目前还处于完善的阶段并没有一个严格的执行标准。 所以我们对于概念的理解就直接影响数据湖的实现而数据湖实现的好和坏也影响着公司数据治理的高度。 目前数据湖概念在逐步被企业接受但是在实现上每个企业都不尽相同 企业对于数据湖的实现遵循数据湖的基本概念 能够实现任意数据输入能够实现集中存储能够提供分析能力 那么企业只要遵循上面这些要求去构建数据湖即可在实现的过程中使用了什么数据框架应用了哪些技术这些都是企业自己去做的决定的自己把握。 ps也就是只要能满足数据湖的概念要求具体企业爱怎么实现就怎么实现你用一堆磁盘实现数据湖的存储都可以只要满足要求。 那么尽管每个企业的实现都不尽相同但总归是有些架构是用的较多的大多数都会选择的。 那么课程就根据最具有普遍性的架构设计来给同学们讲解一下数据湖的具体实现 转存失败重新上传取消 如图是比较典型的基于Hadoop、Spark生态的一种常规数据湖实现架构。 其中 数据湖的核心就是由HDFS提供的存储层以及构建在HDFS之上的包括权限管控、安全授权、审计、元数据管理等一系列数据管理工具基于HDFS存储层的核心可以接受由Kafka、FLume、Sqoop、或其它数据工具的任意格式的数据输入在HDFS存储层之上构建了由Spark、MR等计算框架对数据进行处理的数据处理层。并由数据处理层的产出可以导出至如数据仓库、HBase、Mysql、或其它需要应用到处理后数据的地方并最终由数据展示和提供层对外提供数据产品。 由上可以得知数据湖的核心实现就是存储层以及围绕在其之上的一系列数据应用和数据治理的服务。 那么接下来我们来解析一下这样架构下的数据湖的核心。 6.1. 4.1 数据湖的核心 海量的任意格式原始数据存储海量数据分析利用的能力 6.1.1. 4.1.1 存储层 由HDFS提供的数据存储层应该是比较好理解的我们提及到数据湖本质上就是数据的集中存储那么由HDFS提供数据的集中存储是合适的。 HDFS本身是一款分布式文件系统除了能够提供存储支撑外也能提供 高可用性可拓展性可靠性易用性 等一系列的优势特性。 也满足了对数据湖要求中的 可拓展的可靠的吞吐量原有格式存储支持多种数据源的输入多分析框架的支持 6.1.2. 4.1.2 数据管理 数据湖核心除了存储以外也包含了数据管理的内容。 我们可以想象数据湖作为一个企业内海量数据的集中存储那么就不仅仅是个大型网盘而已对数据的管理也是必要的功能。 6.1.2.1. 1. 安全 数据湖需要安全方面的管控常规的我们在hadoop实现安全访问一般会使用Kerberos来实现。 Kerberos是一款安全框架可以和Hadoop无缝集成基于身份认证来提供授权管理 同时也会配合HDFS本身的ACL权限控制来辅助做安全管理。 6.1.2.2. 2. 审计 大型企业对于审计也是有要求的不过目前暂未有成熟的针对Hadoop进行审计的框架和平台。 目前多数的做法是开启Namenode的审计日志然后将日志导入到其它日志处理框架中如elasticsearch进行审计操作。 6.1.2.3. 3. 元数据管理 元数据管理对于数据湖而言也是非常重要的。 试想一下数据湖汇聚来自企业各方面的数据那么湖内的数据就会又多又杂如果不能对数据进行很好的归纳管理以及元数据管理的话就很容易让数据湖变成数据沼泽 也就是没有元数据管理数据湖内存放的就是垃圾 我们对于数据进行分析处理产生的二次产出、三次产出等中间数据也是需要做元数据管理不然也会导致数据湖内的数据混乱。 同时元数据管理带来的好处还在于我们可以对元数据进行检索查询除了能够快速定位自己需要的数据内容也可以帮组我们快速找到符合我们需要的数据。 比如想要在数据湖内找到关于订单相关的数据同时要包含有如时间、用户、订单号等字段那么对于元数据的检索就可以快速帮组我们来找到需要的这些数据或者说帮我们找到有没有符合要求的数据。 6.1.3. 4.1.3 数据处理 除了存储层和 数据管理以外 数据的处理在数据湖中也是重中之重。 毕竟不管数据存的好还是数据管的好最终还是要落到数据用的好。 不过数据处理的实现就可以脱离出数据湖架构之外单独进行架构设计了。 如我们前面简单介绍过的Lambda、Kappa、IOTA等架构就是对数据处理、数据应用的一些成熟的架构体系。 当然如果你有自己的设计架构也是可以的。不一定要使用别人提供的架构适合自己的才是最好的。 一般而言通用的数据处理也就是大家大差不差都差不多方式都会涉及到数据的提炼也就是对数据进行处理产生对应适合业务的 中间数据也就是Lambda架构中的view) 那么对于这样的需求一般通用的处理就是根据公司的业务使用Spark、MR、Flink等框架对数据进行分析处理以得到满足业务需求的数据结果。 并最终利用这些内容导出至适合的场景内进行利用如导入到数仓中为数仓提供数据来源或者导出到其它数据存储如Mysql HBase MongoDB等用以支撑业务。 并最终作为公司的数据产品提供服务。 那么这就是数据湖架构的数据扭转全链条。 -总结 不同的使用方式架构方案在企业里面的定位就不同。数据湖和普通的大数据分析处理架构基本上一模一样存储层HDFS配合其他的辅助功能如安全管理、权限管理、审计、元数据管理分析层Lambda、IOTA、Kappa等架构哎实现实际上大多数都是基于迭代中间数据这样的概念来生产业务可用的数据结果。 我们给同学们介绍了使用Hadoop、Spark生态构建数据湖的常见架构。 但是在这样的实现下还是有一定的不足之处的。 那么这些不足之处是什么以及是如何解决的就交由本次课程的重点Delta Lake来进行解答 7. 五、Delta Lake - 数据湖核心的增强[重点] 学习步骤 掌握Delta lake的基础概念掌握Delta Lake的重点特性掌握Delta Lake的使用形式 7.1. 5.1 什么是Delta Lake 转存失败重新上传取消 Delta Lake是由Spark的商业化公司也就是大名鼎鼎的砖厂Databricks所推出并开源的一款 基于HDFS的存储层框架 Reliable Data Lakes at Scale 是Delta Lake的口号 构建大规模的可靠的数据湖 转存失败重新上传取消 由上图可以得知Delta Lake本质上就是 一款开源的存储层将ACID事务引入到了Spark以及大数据工作负载中 由此可见Delta Lake 作为一款存储层框架是通过拓展Spark的功能通过Spark作为媒介来实现存储层面的增强。 7.2. 5.2 Delta Lake 有什么特性 Delta Lake 带来了许多的特性这些特性可以说就是针对我们前面所说的Hadoop体系中构建数据湖的不足的。 ACID 事务控制 数据湖通常具有多个同时读取和写入数据的数据管道并且由于缺乏事务数据工程师必须经过繁琐的过程才能确保数据完整性。 Delta Lake将ACID事务带入您的数据湖。它提供了可序列化性最强的隔离级别。 可伸缩的元数据处理 在大数据中甚至元数据本身也可以是“大数据”。 Delta Lake将元数据像数据一样对待利用Spark的分布式处理能力来处理其所有元数据。这样Delta Lake可以轻松处理具有数十亿个分区和文件的PB级表。 数据版本控制 Delta Lake提供了数据快照使开发人员可以访问和还原到较早版本的数据以进行审核回滚或重现实验。 开放的数据格式 Delta Lake中的所有数据均以Apache Parquet格式存储从而使Delta Lake能够利用Parquet固有的高效压缩和编码方案。 统一的批处理和流处理的source 和 sink: Delta Lake中的表既是批处理表又是流计算的source 和 sink。流数据提取批处理历史回填和交互式查询都可以直接使用它。 Schema执行 Delta Lake提供了指定和执行模式的功能。这有助于确保数据类型正确并且存在必需的列从而防止不良数据导致数据损坏。 Schema演化 大数据在不断变化。 Delta Lake使您可以更改可自动应用的表模式而无需繁琐的DDL。 审核历史记录 Delta Lake事务日志记录有关数据所做的每项更改的详细信息从而提供对更改的完整审核跟踪。 更新和删除 Delta Lake支持Scala / Java API进行合并更新和删除数据集。 100%和Apache Spark的API兼容 开发人员可以将Delta Lake与现有的数据管道一起使用而无需进行任何更改因为它与常用的大数据处理引擎Spark完全兼容。 7.3. 5.3 Delta Lake 重点特性解读 在第四章的时候我们说过基于Hadoop、Spark生态的数据湖实现是有一些不足的以及在上一节我们提到Delta Lake的这些特性就是为了解决 Hadoop、Spark架构下数据湖实现的不足之处的。 那么这些特性到底解决了什么问题呢 7.3.1. 中间数据 首先先来理解一下中间数据这个概念 数据湖内的原始数据直接利用在业务分析上是比较困难的。 一个主要原因就是我们在构建数据湖的时候汇入的数据是基于数据湖的指导原则的数据和业务分离 也就是说这些数据是其最原始的样子并不贴合业务分析的需求。 一般情况下企业都会对原始数据进行一次、二次、乃至多次的迭代处理将这些数据分阶段、分步骤的逐步处理成业务想要的样子这样就更适合做业务分析。 那么这些迭代处理所产生的一系列数据文件我们称之为中间数据 转存失败重新上传取消 PS: 其实这种分析模式就是Lambda架构中对于批离线数据的处理方式。 中间数据也就是Lambda架构中的Batch View 7.3.1.1. ACID 事务控制 在基于中间数据这种处理模式下Hadoop、Spark生态构建数据湖的一个不足之处就在于在数据处理的过程中没有事务控制。 原因1 在数据转换的过程中如果出现问题造成了数据处理的不完整这就会导致基于此数据的后续操作均产生了偏差。 而修复这些偏差就需要耗费工程师很大的精力特别在数据量大的时候。 原因2 生成的中间数据并不只会有一个人在用如果多个人对同一个中间数据进行了修改、更新操作就会产生冲突 而这种冲突也会造成数据迭代链条的断裂。 Delta Lake实现了事务日志的记录对于数据的任何操作都记录在事务日志里面同时也基于事务日志实现了ACID的事务控制。 所以ACID级别的事务控制可以有效的帮助工程师控制中间数据迭代的过程并避免冲突。 7.3.1.2. 数据版本控制 同样对于一份中间数据可能被我们折腾了多次版本更新后发现最初的样子才是最好的样子。 但是中间数据已经被我们修改的面目全非了怎么办 这就是Hadoop Spark生态构建数据湖的第二个不足之处没有数据版本控制 Delta Lake带来了这个特性可以让我们随时随地的回退到数据在任何时间点之上的版本。 注意是任意版本。 也就是说从这个数据被创建到最新的状态这中间任何时间点的版本均可回退。 这就给工程师们倒腾数据提供了一个强有力的支撑再也不怕折腾废了 所以数据版本控制对于构建数据湖生态体系同样重要 7.3.1.3. 可伸缩的元数据处理 我们已经知道Delta Lake可以帮助我们控制事务以及进行任意时间点的数据回滚操作。 那么如果某些中间数据经过了超多次的版本更新并且其数据内容非常巨大。 对于这样的情况如何做到任意时间点的回滚呢 这就是Delta Lake的另一个强大之处强大的元数据处理能力 在Delta Lake的设计中元数据数据的事务日志也是当成一种普通的数据对待。 对于元数据的处理当成一种普通的Spark任务去做应用Spark强大的分布式并行计算能力可以完成对超大规模的数据的管理和溯源。 7.3.1.4. 审核历史记录 在这个图中我们可以看到对于数据的审计同样是数据湖需要实现的功能之一。 基于Delta Lake的事务日志除了能够提供事务控制、数据版本控制以外同样可以通过对事务日志的检索来做数据的审查。 这样更能清楚的知道在什么时间点做了什么操作改了哪些内容删了什么东西。 这一特性对企业来说同样重要. 7.3.1.5. 统一的批处理和流处理的source 和 sink: Delta Lake的表可以作为离线统计的输出 同样也可以作为 流式计算的 Source 以及Sink 也就是说不管是 离线批处理还是实时流计算都可以对同一张表同一个Schema进行操作。 这样让流和批统一起来更加适合企业的架构。 由图可以看出对于Delta Lake表的操作 不分流和批调用SparkAPI 可以直接对Delta Lake Table进行操作 因为Delta Lake还有一个特性就是100%兼容Spark APISpark API可以直接对Delta Lake Table进行操作。 7.4. 5.4 Delta Lake 的使用形式 我们前面提到过企业对于数据湖内海量的数据存储利用的方式大多数会遵循产生中间数据、迭代中间数据的方式来进行也就是前面我们看到的这个图。 在现阶段实现这样的分析流程多数的时候企业会选择使用Spark、SparkSQL来进行处理。 因为不管原始数据是什么样子一般到中间数据这里数据都基本上是结构化的数据。 结构化的数据又特别适合使用SparkSQL来进行分析处理。 同时这些中间数据一般会选择存储为Parquet文件格式进行存储。 那么这里当企业加入了Delta Lake之后其分析处理的逻辑依旧不变存储的格式依旧是存储Parquet格式。 只不过原本使用SparkSQL进行数据处理的时候SparkSQL是不具备如 事务控制数据版本控制元数据管理等一系列功能的 在引入了Delta Lake之后依旧使用SparkSQL的方式操作数据但是这些特性就会随着Delta Lake带来。 如下图 这张图是官方给出的我们可以看到Delta Lake 是架构在你已存在的数据湖之上HDFS、S3、Azure 数据存储。 我们可以认为Delta Lake就是SparkSQL或者说Spark的一个插件一个外挂。 增强了SparkSQL了功能同时并不改变你的使用方式。 同时你对数据的分析模式也不会因为Delta Lake的加入发生改变 如图官方在图里面使用了很形象的比喻来说明了对数据处理的流程 数据从 Bronze(青铜)也就是原始数据经过分析转变成了Silver(白银)也就是我们说的中间数据并最终转换为Gold(黄金)这样的数据就可以直接被业务所利用所分析了。 Delta Lake的使用形式 本质上还是使用原有的Spark、SparkSQL的方式来处理数据 处理的流程也不变还是原有的对中间数据进行迭代的方式多跳架构) 变化的地方在于存储数据的过程中加入了Delta Lake的支持。也就是使用Delta Lake提供的API进行数据的存储管理 数据分析就和打排位赛一样。 我们初登场的时候就是青铜组选手原始数据 只有我们不断的进行提取、进行处理数据才会升级并最终达到黄金段位、达到钻石、达到业务所需。 8. 六、Delta Lake - Quickstart 学习步骤 掌握Delta Lake 的安装和启动掌握简单的Delta Lake操作 8.1. 6.1 安装 Delta Lake的安装非常简单严格来说Delta Lake不需要安装它包含在最新的Spark发行版中。 要求使用的Spark版本2.4.2 我们可以使用两种方式来应用Delta Lake 互动式使用Delta Lake启动Spark shellScala或Python并在shell中交互运行代码段。包含在工程中使用Delta Lake设置Maven或SBT项目Scala或Java来应用Delta Lake 8.1.1. 6.2 互动式 8.1.1.1. PySpark 如果要在PySpark中启动请先更新PySpark的版本 pip install --upgrade pysparkRun PySpark with the Delta Lake package: pyspark --packages io.delta:delta-core_2.11:0.5.08.1.1.2. Spark Scala Shell bin/spark-shell --packages io.delta:delta-core_2.11:0.5.0首次打开需要网络连接会联网下载Delta Lake相关的包 8.1.2. 6.3 包含在工程中 8.1.2.1. Maven dependencygroupIdio.delta/groupIdartifactIddelta-core_2.11/artifactIdversion0.5.0/version /dependency8.1.2.2. Scala SBT libraryDependencies io.delta %% delta-core % 0.5.08.1.3. 6.4 创建表 本文中的所有操作均以Scala代码做演示。 在适当的地方会贴出Java和Python的同样操作代码 打开Spark Shell执行如下代码 val data spark.range(0, 5) data.write.format(delta).save(/tmp/delta-table)这样我们就创建了一个0到5的数据range并保存为了Delta Lake表 同时可以在HDFS的目录中看到如下图一堆的Parquet文件。 这些Parquet文件就是保存的具体数据文件。 转存失败重新上传取消 8.1.3.1. 补充Python和Java的操作方式 Python data spark.range(0, 5) data.write.format(delta).save(/tmp/delta-table)Java import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row;SparkSession spark ... // create SparkSessionDatasetRow data data spark.range(0, 5); data.write().format(delta).save(/tmp/delta-table);8.1.4. 6.5 读取数据 我们刚刚保存了一个0-5的range数据到delta lake的一个table中(Parquet文件)。 现在我们来读取这个文件并查看内容 Spark Shell val df spark.read.format(delta).load(/tmp/delta-table) df.show()我们可以看到可以正常的读出内容 8.1.4.1. 补充Python和Java的操作方式 Python df spark.read.format(delta).load(/tmp/delta-table) df.show()Java DatasetRow df spark.read().format(delta).load(/tmp/delta-table); df.show();8.1.5. 6.6 更新数据 现在我们来尝试更新一下这个数据表Parquet文件 Spark Shell val data spark.range(5, 10) data.write.format(delta).mode(overwrite).save(/tmp/delta-table) df.show()我们使用一个新的 5 - 10 的range来覆盖这个表的内容 我们来重新读取一下 val df spark.read.format(delta).load(/tmp/delta-table) df.show()8.1.5.1. 补充Python和Java的操作方式 Python data spark.range(5, 10) data.write.format(delta).mode(overwrite).save(/tmp/delta-table)Java DatasetRow data data spark.range(5, 10); data.write().format(delta).mode(overwrite).save(/tmp/delta-table);8.1.6. 6.7 有条件的更新而不覆盖 Delta Lake提供了编程API可以有条件地将数据更新删除和合并向上插入到表中。这里有一些例子。 Spark Shell import io.delta.tables._ import org.apache.spark.sql.functions._val deltaTable DeltaTable.forPath(/tmp/delta-table)// 通过将每个偶数值加100来更新每个偶数值 deltaTable.update(condition expr(id % 2 0),set Map(id - expr(id 100)))// 删除偶数 deltaTable.delete(condition expr(id % 2 0))// 合并新数据 val newData spark.range(0, 20).toDFdeltaTable.as(oldData).merge(newData.as(newData),oldData.id newData.id).whenMatched.update(Map(id - col(newData.id))).whenNotMatched.insert(Map(id - col(newData.id))).execute()deltaTable.toDF.show()8.1.6.1. 补充Python和Java的操作方式 Python from delta.tables import * from pyspark.sql.functions import *deltaTable DeltaTable.forPath(spark, /tmp/delta-table)# Update every even value by adding 100 to it deltaTable.update(condition expr(id % 2 0),set { id: expr(id 100) })# Delete every even value deltaTable.delete(condition expr(id % 2 0))# Upsert (merge) new data newData spark.range(0, 20)deltaTable.alias(oldData) \.merge(newData.alias(newData),oldData.id newData.id) \.whenMatchedUpdate(set { id: col(newData.id) }) \.whenNotMatchedInsert(values { id: col(newData.id) }) \.execute()deltaTable.toDF().show()Java import io.delta.tables.*; import org.apache.spark.sql.functions; import java.util.HashMap;DeltaTable deltaTable DeltaTable.forPath(/tmp/delta-table);// Update every even value by adding 100 to it deltaTable.update(functions.expr(id % 2 0),new HashMapString, Column() {{put(id, functions.expr(id 100));}} );// Delete every even value deltaTable.delete(condition functions.expr(id % 2 0));// Upsert (merge) new data DatasetRow newData spark.range(0, 20).toDF();deltaTable.as(oldData).merge(newData.as(newData),oldData.id newData.id).whenMatched().update(new HashMapString, Column() {{put(id, functions.col(newData.id));}}).whenNotMatched().insertExpr(new HashMapString, Column() {{put(id, functions.col(newData.id));}}).execute();deltaTable.toDF().show();8.1.7. 6.8 使用时间旅行读取旧版本的数据 我们说过Delta Lake支持数据的版本控制那么还记得我们对这个一直使用的表做了哪些更改吗来回顾一下 初始阶段创建了一个0-5的range 表内数据应该是0, 1, 2, 3, 4这是版本0然后被覆盖为了5 - 10 的range表内数据应该是5, 6, 7, 8, 9这是版本1然后对偶数都加了100表内数据应该是5, 106, 7, 108, 9这是版本2然后删除了所有偶数表内数据应该是5, 7, 9这是版本3最后做了一次合并表内数据应该是0 - 20 的range这是版本4也是当前最新版本。 我们来尝试回退到版本0 Spark Shell val df spark.read.format(delta).option(versionAsOf, 0).load(/tmp/delta-table) df.show()转存失败重新上传取消 可以发现我们读取到了最初的版本0的内容。 8.1.7.1. 补充Python和Java的操作方式 Python df spark.read.format(delta).option(versionAsOf, 0).load(/tmp/delta-table) df.show()Java DatasetRow df spark.read().format(delta).option(versionAsOf, 0).load(/tmp/delta-table); df.show();我们可以尝试将版本数从0修改为其它数来看一下是否能读到各个版本的数据。 我们对/tmp/delta-table这个表的修改版本最大到版本4如果你读取版本4就会报错哦 它会提示我们版本只有[0, 4] 如果想要回滚到某个版本只需要将数据读出后然后使用overwrite的方式在写回去就好了哦。 比如将数据回滚为版本0的状态 Spark Shell spark.read.format(delta).option(versionAsOf, 0).load(/tmp/delta-table).write.format(delta).mode(overwrite).save(/tmp/delta-table)spark.read.format(delta).load(/tmp/delta-table).showPS: 要注意哦回滚其实也是一种更新这样操作会产生版本5的哦。 这样的机制也是保障就算回滚了也能反向再滚回去。 8.1.8. 6.9 事务日志 我们说过其能实现ACID事务以及实现版本控制是基于其事务日志的。 事务日志位于表目录下的_delta_log文件夹 打开这个文件夹 可以发现有6个JSON文件这6个JSON文件其实就是对应我们刚刚操作表的6个版本0, 1, 2, 3, 4, 5 我们打开00000.json {commitInfo:{timestamp:1581511770889,operation:WRITE,operationParameters:{mode:ErrorIfExists,partitionBy:[]},isBlind Append:true}} {protocol:{minReaderVersion:1,minWriterVersion:2}} {metaData:{id:250ed355-a118-4510-b60a-6fa16c6a3ec0,format:{provider:parquet,options:{}},schemaString:{\type\:\struc t\,\fields\:[{\name\:\id\,\type\:\long\,\nullable\:true,\metadata\:{}}]},partitionColumns:[],configuration:{},crea tedTime:1581511770204}} {add:{path:part-00000-316f67f3-4c58-48f9-baf5-39a3cf5a335b-c000.snappy.parquet,partitionValues:{},size:429,modificationTime :1581511770855,dataChange:true}} {add:{path:part-00001-cf6de084-a74c-4563-adcb-57b3f6fb88a2-c000.snappy.parquet,partitionValues:{},size:429,modificationTime :1581511770854,dataChange:true}} {add:{path:part-00002-8321221c-d29f-42d5-a3e3-36d35746d5f4-c000.snappy.parquet,partitionValues:{},size:429,modificationTime :1581511770854,dataChange:true}} {add:{path:part-00003-2ec4f7a3-b089-4f32-a256-7791d586de81-c000.snappy.parquet,partitionValues:{},size:437,modificationTime :1581511770854,dataChange:true}}可以看到这里记录了对这个文件的提交记录。 再打开00005.json {commitInfo:{timestamp:1581514597463,operation:WRITE,operationParameters:{mode:Overwrite,partitionBy:[]},readVersion :4,isBlindAppend:false}} {add:{path:part-00000-61faefe4-3aab-4f65-813c-d2eb15c9460a-c000.snappy.parquet,partitionValues:{},size:437,modificationTime :1581514596698,dataChange:true}} {add:{path:part-00001-5dd5f4e2-c285-4c6a-859d-b170b81e21ed-c000.snappy.parquet,partitionValues:{},size:429,modificationTime :1581514596694,dataChange:true}} {add:{path:part-00002-50f9ac71-3de7-43fb-9338-0c46ed013afd-c000.snappy.parquet,partitionValues:{},size:429,modificationTime :1581514596697,dataChange:true}} {add:{path:part-00003-74dcb969-8d4d-4b47-9d6a-766c4a70ec00-c000.snappy.parquet,partitionValues:{},size:429,modificationTime :1581514596699,dataChange:true}} {remove:{path:part-00112-10fe4d5b-8fbc-4001-80b2-03ea399c7de7-c000.snappy.parquet,deletionTimestamp:1581514597463,dataChange: true}} {remove:{path:part-00190-1e873ada-5a1e-466e-8134-ad0ee0b55ab6-c000.snappy.parquet,deletionTimestamp:1581514597463,dataChange: true}} {remove:{path:part-00116-ccfc3385-4b6b-4a8e-9e9f-c0dd026b36e2-c000.snappy.parquet,deletionTimestamp:1581514597463,dataChange: true}} {remove:{path:part-00077-ca3f3d5c-776b-4bba-9f6b-cd5f7d4617c4-c000.snappy.parquet,deletionTimestamp:1581514597463,dataChange: true}} {remove:{path:part-00011-d66aa330-1718-49dc-a586-e60c351da6c5-c000.snappy.parquet,deletionTimestamp:1581514597463,dataChange: true}} {remove:{path:part-00004-c8c139fd-d44d-4461-a039-0a4146f0d629-c000.snappy.parquet,deletionTimestamp:1581514597463,dataChange: true}} {remove:{path:part-00005-c4dc991a-7587-444e-ba54-bd23bddcc4b6-c000.snappy.parquet,deletionTimestamp:1581514597463,dataChange: true}} {remove:{path:part-00164-91fbad05-f2e4-4869-90b0-e558b91d5606-c000.snappy.parquet,deletionTimestamp:1581514597463,dataChange: true}} {remove:{path:part-00068-c661606b-8d9f-4dcf-bf6c-6d7f6eccbed3-c000.snappy.parquet,deletionTimestamp:1581514597463,dataChange: true}} {remove:{path:part-00000-ca2eb3fa-bb2e-4ce3-9c90-6031cd7643a1-c000.snappy.parquet,deletionTimestamp:1581514597463,dataChange: true}} {remove:{path:part-00121-71afe54a-7318-44ca-adff-1e12268fd851-c000.snappy.parquet,deletionTimestamp:1581514597463,dataChange: true}} {remove:{path:part-00045-d369dda1-1463-48b2-b73f-2d8ee5b42f8e-c000.snappy.parquet,deletionTimestamp:1581514597463,dataChange: true}} {remove:{path:part-00049-a8abf986-2d78-4a67-8d5c-c1461861a914-c000.snappy.parquet,deletionTimestamp:1581514597463,dataChange: true}} {remove:{path:part-00058-aa375fb3-f2b5-4828-9f1c-a7ee7e49925b-c000.snappy.parquet,deletionTimestamp:1581514597463,dataChange: true}} {remove:{path:part-00128-28f91ced-b8ad-4f13-bc27-966afcff3dd1-c000.snappy.parquet,deletionTimestamp:1581514597463,dataChange: true}} {remove:{path:part-00154-013c0e3d-7741-42d6-ae5c-902d377daaf0-c000.snappy.parquet,deletionTimestamp:1581514597463,dataChange: true}} {remove:{path:part-00140-faf02234-4b02-4987-88d7-0425c385fb65-c000.snappy.parquet,deletionTimestamp:1581514597463,dataChange: true}} {remove:{path:part-00143-60822e7c-eb34-4393-bc20-2b12bd52af0b-c000.snappy.parquet,deletionTimestamp:1581514597463,dataChange: true}} {remove:{path:part-00150-9610c09a-a9d7-4aca-9b80-ad0769b5b772-c000.snappy.parquet,deletionTimestamp:1581514597463,dataChange: true}} {remove:{path:part-00069-31211d0f-9e99-4598-ad09-3c260e0db4b0-c000.snappy.parquet,deletionTimestamp:1581514597463,dataChange: true}} {remove:{path:part-00107-81fdfd86-0b13-446e-afd0-7732271a4042-c000.snappy.parquet,deletionTimestamp:1581514597463,dataChange: true}}可以看到这里有remove操作也有add操作就是对应了我们最后一次将数据回滚到版本0的状态实际上是产生了版本5的更新这次操作将数据从20行变为了5行。 表里面由36个Parquet。 实际上我们就记录5行数据用得着这么多的Parquet吗 其实这里面的这么多Parquet每一个都有用因为执行了这么多次的版本更新从元数据事务日志到数据文件都会可以循迹的。 当然如果你只查询最新版本的话 也就这4个Parquet对你有用了。 以上是对Delta Lake的快速入门使用下面我们来详细的使用一下Delta Lake的各种操作。 9. 七、Delta Lake 操作[熟练] 学习步骤 掌握常用的Delta Lake操作 9.1. 7.1 表批量读写 创建表、写入表、更新表、版本控制等我们在QuickStart已经学习过了这里就省略了。 9.1.1. 对数据进行分区 您可以对数据进行分区以加快查询。要在创建增量表时对数据进行分区请按列指定分区。常见的模式是按日期分区例如 df.write.format(delta).partitionBy(date).save(/delta/events)我们来演示一下并回顾一下传统的SparkSQL开发流程 在HDFS上准备了如下文件 [rootst1 ~]# hadoop fs -cat /goods.txt 娃哈哈,5,2011-01-01 薯片,6,2011-01-01 百事可乐,1,2012-01-01 可口可乐,3,2009-01-01 浪味仙,6,2014-01-01 旺仔牛奶,11,2015-01-01 旺仔小馒头,22,2013-01-01 辣条,33,2016-01-01 瓜子,55,2008-01-01 奥利奥,11,2007-01-01 六个核桃,26,2006-01-01 冰红茶,22,2005-01-01 绿茶,3,2004-01-01 奶茶,2,2003-01-01 小浣熊,2,2002-01-01 旺旺仙贝,1,2001-01-01 康师傅方便面,1,2000-01-01 今麦郎,2,1999-01-01 农夫山泉,6,1998-01-01# 创建case class构建schema case class Goods(name:String, price:Int, date:String)# 创建数据RDD val lineRDD sc.textFile(/goods.txt).map(_.split(,))# 将RDD和Schema关联 val schemaRDD lineRDD.map(line Goods(line(0), line(1).toInt, line(2)))# 将RDD转换为DataFrame val dataDF schemaRDD.toDF# show一下 dataDF.show# 好的我们已经得到了一个DataFrame我们来尝试一下将其写成Delta Lake表并使用时间分区 dataDF.write.format(delta).partitionBy(date).save(/tmp/delta/events)可以看到以时间为分区区分了不同的文件夹存储数据了 9.1.2. 追加数据 构建两条新数据 val dataDF2 sc.makeRDD(List(Goods(append1,5,1996-01-01), Goods(append2,10,1997-01-01))).toDF然后执行追加 # 我们前面测试的时候表示用date分区了这里可以继续指定分区也可以不指定都可以的。 dataDF2.write.format(delta).mode(append).partitionBy(date).save(/tmp/delta/events)9.2. 7.2 Schema验证 Delta Lake自动验证正在写入的DataFrame的架构与表的架构兼容。 Delta Lake使用以下规则来确定从DataFrame到表的写入是否兼容 所有DataFrame列都必须存在于目标表中。如果表中不存在DataFrame中的列则会引发异常。表中存在但DataFrame中不存在的列设置为null。DataFrame列数据类型必须与目标表中的列数据类型匹配。如果它们不匹配则会引发异常。Dataframe 列名不能只根据大小写不同。这意味着您不能在同一表中定义诸如“ Foo”和“ foo”之类的列。虽然可以在区分大小写或不区分大小写默认模式下使用Spark但在存储和返回列信息时Parquet区分大小写。 Delta Lake保留大小写但在存储架构时不敏感并且具有此限制以避免潜在的错误数据损坏或丢失问题。 如果您将其他选项例如partitionBy与附加模式结合使用则Delta Lake会验证它们是否匹配并为任何不匹配项引发错误。当不存在partitionBy时追加将自动跟随现有数据的分区。 如果您将其他选项例如partitionBy与附加模式结合使用则Delta Lake会验证它们是否匹配并为任何不匹配项引发错误。当不存在partitionBy时追加将自动跟随现有数据的分区。 PS上面说append可以指定分区也可以不指定分区就是根据这个来的 指定了会验证是否匹配 不指定会自动跟随已有的设置 9.2.1. 测试修改Schema能否写入 继续使用上面的表 准备一个新数据 case class Goods2(name:String, price:Int, date:String, comment:String) val dataDF3 sc.makeRDD(List(Goods2(append3,5,1995-01-01, 测试新列1), Goods2(append4,10,1996-01-01, 测试新列2))).toDF # 执行追加 dataDF3.write.format(delta).mode(append).save(/tmp/delta/events)执行追加会报错提示 转存失败重新上传取消 两者Schema不匹配无法追加。 这个好理解再试一下overwrite呢 dataDF3.write.format(delta).mode(overwrite).save(/tmp/delta/events)得到同样的结果不可以执行。 9.2.2. 如何强制执行 报错提示我们 To enable schema migration, please set: .option(mergeSchema, true).那试验一下。 先试验append dataDF3.write.format(delta).mode(append).option(mergeSchema, true).save(/tmp/delta/events)OK 成功。 并且验证了表中存在但DataFrame中不存在的列设置为null。 这句话的正确。 再试验overwrite 由于Schema已经改为4个列的Goods2了那么创建几条新的Goods数据再覆盖回去 val dataDF4 sc.makeRDD(List(Goods(overwrite1,5,1995-01-01), Goods(overwrite2,10,1996-01-01))).toDF dataDF4.show# 执行覆盖 dataDF4.write.format(delta).mode(overwrite).option(mergeSchema, true).save(/tmp/delta/events) # 查看 spark.read.format(delta).load(/tmp/delta/events).show(30)转存失败重新上传取消 我们发现尽管我们用3个列的数据覆盖回4个列的表。 数据是成功覆盖进去了但是列并未消除这也说明mergeSchema操作可以增加列但是不会因为合并被删除列。 换一个彻底不同的Schema试试overwrite # 新schema case class Test(id:Int, info:String, date:String)scala val dataDF5 sc.makeRDD(List(Test(1, haha, 2000-01-01), Test(2, heihei, 2000-01-02))).toDF dataDF5: org.apache.spark.sql.DataFrame [id: int, info: string ... 1 more field]scala dataDF5.show ------------------- | id| info| date| ------------------- | 1| haha|2000-01-01| | 2|heihei|2000-01-02| -------------------# 执行覆盖 dataDF5.write.format(delta).mode(overwrite).option(mergeSchema, true).save(/tmp/delta/events) # 查看 spark.read.format(delta).load(/tmp/delta/events).show(30)转存失败重新上传取消 我们发现列还是没减少。 这就表明mergeSchema操作只会增加列不会删除列。 9.2.3. 那如何让Schema中列减少呢以当前Schema强制覆盖过去 使用overwriteSchema这个option 将上面的语句换成 dataDF5.write.format(delta).mode(overwrite).option(overwriteSchema, true).save(/tmp/delta/events)转存失败重新上传取消 我们发现Schema被成功的覆盖了。 9.3. 7.3 表更新、删除对Parquet数据文件的影响 9.3.1. 更新指定行 import io.delta.tables._ val deltaTable DeltaTable.forPath(spark, /tmp/delta/events) deltaTable.toDF.show# 表内容 scala deltaTable.toDF.show ----------------- | id|info| date| ----------------- | 1|haha|2000-01-01| -----------------# 修改info列的内容 deltaTable.updateExpr(info haha, Map(info - haha222)) # 不要遗漏haha222上的单引号 # 查看 scala deltaTable.toDF.show -------------------- | id| info| date| -------------------- | 1|haha222|2000-01-01| --------------------Tip使用分区列可以加快速度。 9.3.2. 删除指定行 import io.delta.tables._ val deltaTable DeltaTable.forPath(spark, /tmp/delta/events) deltaTable.toDF.show# 刚刚的表数据内容 scala deltaTable.toDF.show ------------------- | id| info| date| ------------------- | 2|heihei|2000-01-02| | 1| haha|2000-01-01| -------------------# 删除id为2的行 scala deltaTable.delete(id 2)scala deltaTable.toDF.show ----------------- | id|info| date| ----------------- | 1|haha|2000-01-01| -----------------注意 删除只是从最新版本中删除这一行但是并不会从物理存储中删除有版本控制的记录。 如果要删除物理存储需要使用vacuum方法。后面讲 Tip: 如果可能的话尽可能使用分区列来定位这样快。 更新表对重复的进行替换 继续使用刚刚的表 import io.delta.tables._ val deltaTable DeltaTable.forPath(spark, /tmp/delta/events) deltaTable.toDF.show# 表内容 scala deltaTable.toDF.show -------------------- | id| info| date| -------------------- | 1|haha222|2000-01-01| --------------------现在有需求有两条数据 1 replace 2000-01-022new2000-01-03 其中第一行id和已有数据重复需求是使用新数据的替换老的id为1的数据 第二行为新数据直接插入 预测结果为 idinfodate1replace2000-01-022new2000-01-03 执行 import io.delta.tables._ import org.apache.spark.sql.functions._val df sc.makeRDD(List(Test(1, replace, 2000-01-02), Test(2, new, 2000-01-03))).toDFdeltaTable.as(old).merge(df.as(new), old.id new.id).whenMatched.updateExpr(Map(info - new.info, date - new.date)).whenNotMatched.insertExpr(Map(id - new.id, info - new.info, date - new.date)).execute结果 scala deltaTable.toDF.show -------------------- | id| info| date| -------------------- | 1|replace|2000-01-02| | 2| new|2000-01-03| --------------------和预期的一致。 Tip: 如果在whenMatched的时候想要更新全部字段可以使用updateAll它等同updateExpr(Map(col1 - source.col1, col2 - source.col2, ...)) 同理在whenNotMatched的时候想要插入全部字段可以使用insertAll它等同于insertExpr(Map(col1 - source.col1, col2 - source.col2, ...)) 那么命令可以变为 deltaTable.as(old).merge(df.as(new), old.id new.id).whenMatched.updateAll.whenNotMatched.insertAll.execute注意如果源数据中有多行匹配上如id为1的有多行那么当whenMatched生效的时候不确定会使用哪一行这样通常会导致失败。 最好是在源数据表中清除这种模棱两可的内容。 也就是比如这个例子中的id列作为匹配列最好是唯一的不重复的。 如果您知道几天之内可能会得到重复的记录则可以通过按日期对表进行分区然后指定要匹配的目标表的日期范围来进一步优化查询。 比如上面的通过id进行匹配old.id new.id如果有时间列的话可以指定时间范围比如指定最近7天的数据 old.id new.id AND old.date current_date() - INTERVAL 7 DAYS 或者 old.id new.id AND old.date date_sub(current_date, 7) 命令最终如下 deltaTable.as(old).merge(df.as(new), old.id new.id AND old.date current_date() - INTERVAL 7 DAYS).whenMatched.updateAll.whenNotMatched.insertAll.execute9.4. 7.4 Delta Lake 表实用工具 9.4.1. Vacuum 您可以通过在表上运行vacuum命令来删除不再由Delta表引用的文件并且这些文件早于保留阈值。 Vacuum不会自动触发。文件的默认保留期限为7天。 也就是运行了Vacuum后7天以前的文件就被清除了。 注意使用了Vacuum后7天前的文件被清除同时版本控制也无法回退到7天之前的那些状态了。 慎用。 使用 import io.delta.tables._val deltaTable DeltaTable.forPath(spark, pathToTable)deltaTable.vacuum() // vacuum files not required by versions older than the default retention perioddeltaTable.vacuum(100) // vacuum files not required by versions more than 100 hours old转存失败重新上传取消 默认情况下vacuum不接受小于168的参数。 如果想要强制让参数小于168需要设置spark.databricks.delta.retentionDurationCheck.enabled false 可以设置在spark-default.conf文件中永久生效 当设置好了后再执行 转存失败重新上传取消 scala deltaTable.vacuum(1) Deleted 28 files and directories in a total of 24 directories. res1: org.apache.spark.sql.DataFrame []可以看出从总共24个目录里面删除了28个文件和目录。 这样这个表就只保留了最近1小时的版本 注意不是精确到秒的1小时。因为版本之间的更新间隔不确定导致有的数据文件是有一定跨度的版本上属于1小时内的版本但是其数据文件可能是1.5小时前创建的。 也就是这样做了之后大致上2小时以前的文件是没了的。 官方建议我们最好不要让保留期小于7天。 并且不要关闭spark.databricks.delta.retentionDurationCheck.enabled除非你有必要否则不要动它 vacuum是一个很暴力的操作谨慎使用。 Tip vacuum(0) 表示除了最新版本以外历史版本全部丢弃。 9.4.2. 历史 可以通过运行history命令来获取有关每次写入Delta表的操作用户时间戳等信息。以相反的时间顺序返回操作。默认情况下表历史记录会保留30天。 import io.delta.tables._val deltaTable DeltaTable.forPath(spark, /tmp/delta/events)val fullHistoryDF deltaTable.history() // get the full history of the tableval lastOperationDF deltaTable.history(1) // get the last operation# 查看最新一次操作 scala lastOperationDF.show --------------------------------------------------------------------------------------------------------------------------------- |version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend| --------------------------------------------------------------------------------------------------------------------------------- | 11|2020-02-13 00:39:...| null| null| MERGE|[predicate - ((o...|null| null| null| 10| null| false| ---------------------------------------------------------------------------------------------------------------------------------查看完整的操作记录可以查看fullHistoryDF的内容 转存失败重新上传取消 9.4.3. 生成 您可以为Delta表生成清单文件其他处理引擎即Apache Spark除外可以使用清单文件来读取Delta表。例如要生成可被Presto用来读取Delta表的清单文件可以运行以下命令 val deltaTable DeltaTable.forPath(pathToDeltaTable) deltaTable.generate(symlink_format_manifest)执行完成后会在表目录下生产对应的文件 转存失败重新上传取消 文件内容 [rootst1 resources]# hadoop fs -cat /tmp/delta/events/_symlink_format_manifest/manifest|more hdfs://st1:8020/tmp/delta/events/part-00000-6028256d-6bd1-4282-adbe-3d02908d2f55-c000.snappy.parquet hdfs://st1:8020/tmp/delta/events/part-00000-b12d24d0-1430-48ab-ad66-f0f2e838ba9f-c000.snappy.parquet hdfs://st1:8020/tmp/delta/events/part-00043-ba131274-7728-4847-8fcf-1913d0918a0a-c000.snappy.parquet hdfs://st1:8020/tmp/delta/events/part-00000-60aaa505-d198-495b-ad98-31694c9adc8b-c000.snappy.parquet hdfs://st1:8020/tmp/delta/events/part-00043-f7025921-5307-415f-b465-96dbdfd19cf8-c000.snappy.parquet hdfs://st1:8020/tmp/delta/events/part-00174-0133c321-427f-4bc9-ac83-bf74bc94e5a2-c000.snappy.parquet hdfs://st1:8020/tmp/delta/events/part-00043-4f761893-4fd2-49ab-8fbb-8a5d74116c18-c000.snappy.parquet hdfs://st1:8020/tmp/delta/events/part-00174-0bc54e7f-a709-4a29-9fac-0c34355744dd-c000.snappy.parquet hdfs://st1:8020/tmp/delta/events/part-00000-7bcc9e25-9b78-41e6-9148-b84bff427cc2-c000.snappy.parquet hdfs://st1:8020/tmp/delta/events/part-00000-ca6fbb61-54fb-4a57-a434-15883e78bbe1-c000.snappy.parquet hdfs://st1:8020/tmp/delta/events/part-00000-ed4847b5-1743-4f14-a762-f45f5bff739c-c000.snappy.parquet hdfs://st1:8020/tmp/delta/events/part-00174-56ea9aaa-ca4a-4e30-8eb5-76d7de9bb0bd-c000.snappy.parquet这个操作主要帮助我们集成Presto以及AWS Athena 可以参考Presto and Athena to Delta Lake integration — Delta Lake Documentation 9.4.4. 将Delta Lake的表转换为普通的Parquet表 您可以通过以下步骤轻松地将Delta表转换回Parquet表 如果您执行了可以更改数据文件的Delta Lake操作例如“删除”或“合并”则首先运行vacuum(0) 以删除不属于该表的最新版本的所有数据文件。删除 _delta_log这个目录 ​ 我们来尝试一下将/tmp/delta/events这个表转换为普通的parquet表 import io.delta.tables._ val dt DeltaTable.forPath(/tmp/delta/events) dt.toDF.show在执行vacuum之前 转存失败重新上传取消 执行vacuum(0) scala dt.vacuum(0) Deleted 26 files and directories in a total of 24 directories. res7: org.apache.spark.sql.DataFrame []已经清除历史版本 转存失败重新上传取消 现在只要把_delta_log删除后就成为了普通的parquet表了。 执行 hadoop fs -rm -r /tmp/delta/events/_delta_log # 把刚刚生成的manifest也删除 hadoop fs -rm -r /tmp/delta/events/_symlink_format_manifest这样就成为普通的parquet表了 转存失败重新上传取消 9.4.5. 将普通的parquet表转换为Delta 将现有的Parquet表就地转换为 Delta 表。 该命令列出目录中的所有文件创建一个 Delta Lake 事务日志来跟踪这些文件并通过读取所有 Parquet 文件的页脚自动推断数据模式。 如果数据已分区则必须指定分区列的架构。 import io.delta.tables._// Convert unpartitioned parquet table at path /path/to/table val deltaTable DeltaTable.convertToDelta(spark, parquet./path/to/table)// Convert partitioned parquet table at path /path/to/table and partitioned by integer column named part val partitionedDeltaTable DeltaTable.convertToDelta(spark, parquet./path/to/table, part int)示范将刚刚得到的parquet表转换为delta表 import io.delta.tables._ val deltaTable DeltaTable.convertToDelta(spark, parquet./tmp/delta/events)scala val deltaTable DeltaTable.convertToDelta(spark, parquet./tmp/delta/events) deltaTable: io.delta.tables.DeltaTable io.delta.tables.DeltaTable7d36eb0scala deltaTable.toDF.show -------------------- | id| info| date| -------------------- | 1|replace|2000-01-02| | 1|replace|2000-01-02| | 1|replace|2000-01-02| | 2| new|2000-01-03| | 2| new|2000-01-03| | 2| new|2000-01-03| --------------------转存失败重新上传取消 查看HDFS_delta_log也回来了。 只不过这个由parquet转换来的表是没有历史版本的当前就是版本0。 9.5. 7.5 Delta Lake 阶段总结 95%的代码都是SparkSQL的写法一模一样。DataSet DataFrame来发起SparkSQL的逻辑现在由DeltaTable这个对象发起逻辑。发起逻辑之后的一系列操作依旧和SparkSQL的写法一样。对表的追加以及覆盖等操作DataFrame.write.format(delta).save().option(mergeSchema, overwriteSchema) DeltaLake 就是在SparkSQl或者Spark上的一个 插件(Lib库一堆jar包) 已经使用Spark SparkSQL 来分析数据或者说构建数据湖的同学们只需要对代码进行少量修改就可以使用DeltaLake了 那么从0开始使用DeltaLake来开发SparkSparkSQL等工程也是很简单的。 95%的代码 依旧是Spark代码。 9.6. 7.6 其他存储系统的配置 Delta Lake 默认采用了HDFS的实现其默认的LogStore是 spark.delta.logStore.classorg.apache.spark.sql.delta.storage.HDFSLogStore同时Delta Lake也支持AWS S3以及微软Azure的存储 9.6.1. AWS - S3 您可以在S3上创建读取和写入Delta表。Delta Lake支持从多个集群进行并发读取但是对S3的并发写入必须源自单个 Spark驱动程序以便Delta Lake提供事务保证。 9.6.1.1. 要求 S3凭据IAM角色推荐或访问密钥Apache Spark 2.4.2及更高版本Delta Lake 0.2.0及以上 9.6.1.2. 快速开始 这是有关如何开始在S3上读写Delta表的快速指南。有关配置的详细说明请参阅下一节。 使用以下命令来启动具有Delta Lake和S3支持的Spark Shell假设您使用针对Hadoop 2.7预先构建的Spark 2.4.3 bin/spark-shell \--packages io.delta:delta-core_2.11:0.2.0,org.apache.hadoop:hadoop-aws:2.7.7 \--conf spark.delta.logStore.classorg.apache.spark.sql.delta.storage.S3SingleDriverLogStore \--conf spark.hadoop.fs.s3a.access.keyyour-s3-access-key \--conf spark.hadoop.fs.s3a.secret.keyyour-s3-secret-key在S3在Scala中中尝试一些基本的Delta表操作 // Create a Delta table on S3: spark.range(5).write.format(delta).save(s3a://your-s3-bucket/path/to/delta-table)// Read a Delta table on S3: spark.read.format(delta).save(s3a://your-s3-bucket/path/to/delta-table)有关其他语言和Delta表操作的更多示例请参见Delta Lake快速入门。 9.6.1.3. S3的配置 以下是为S3配置Delta Lake的步骤。 配置LogStore实施。 设置spark.delta.logStore.class Spark配置属性 spark.delta.logStore.classorg.apache.spark.sql.delta.storage.S3SingleDriverLogStore顾名思义S3SingleDriverLogStore仅当所有并发写入均来自单个Spark驱动程序时该实现才能正常工作。这是一个应用程序属性必须在启动之前设置SparkContext并且在上下文的生存期内不能更改。 hadoop-aws在类路径中包含JAR。 Delta Lake需要软件包中的org.apache.hadoop.fs.s3a.S3AFileSystem类该hadoop-aws软件包FileSystem为S3 实现Hadoop的API。确保此软件包的版本与构建Spark的Hadoop版本匹配。 设置S3凭据。 我们建议使用IAM角色进行身份验证和授权。但是如果要使用密钥这是一种方法在Scala中设置Hadoop配置 sc.hadoopConfiguration.set(fs.s3a.access.key, your-s3-access-key) sc.hadoopConfiguration.set(fs.s3a.secret.key, your-s3-secret-key)9.6.2. Microsoft Azure存储 您可以在Azure Blob存储和Azure Data Lake Storage Gen1上创建读取和写入Delta表。Delta Lake支持来自多个群集的并发写入。 9.6.2.1. 要求 Delta Lake依靠Hadoop FileSystemAPI来访问Azure存储服务。具体来说Delta Lake需要将实现实现为FileSystem.rename()原子的只有较新的Hadoop版本Hadoop-15156和Hadoop-15086才支持该实现。因此您可能需要使用较新的Hadoop版本构建Spark。以下是要求列表 Azure Blob存储 甲共享密钥或共享访问签名SAS使用Hadoop版本2.9.1及更高版本不是3.x构建的Spark 2.4.2及更高版本Delta Lake 0.2.0及以上 Azure Data Lake Storage Gen1 甲服务主要用于OAuth 2.0用户访问使用Hadoop版本2.9.1及更高版本不是3.x构建的Spark 2.4.2及更高版本Delta Lake 0.2.0及以上 请参阅“ 指定Hadoop版本并启用YARN ”以使用特定的Hadoop版本构建Spark以及“ Delta Lake快速入门”以使用Delta Lake设置Spark。 9.6.2.2. 快速开始 这是在Azure Data Lake Storage Gen1上设置Delta Lake的快速指南。有关Azure Data Lake Storage Gen1和Azure Blob存储的详细配置请参阅后续部分。 使用ADLS凭据从Spark主目录启动Spark Shell假设您的Spark是使用Scala 2.11和Hadoop 2.9.2构建的 bin/spark-shell \--packages io.delta:delta-core_2.11:0.2.0,org.apache.hadoop:hadoop-azure-datalake:2.9.2 \--conf spark.delta.logStore.classorg.apache.spark.sql.delta.storage.AzureLogStore \--conf spark.hadoop.dfs.adls.oauth2.access.token.provider.typeClientCredential \--conf spark.hadoop.dfs.adls.oauth2.client.idyour-oauth2-client-id \--conf spark.hadoop.dfs.adls.oauth2.credentialyour-oauth2-credential \--conf spark.hadoop.dfs.adls.oauth2.refresh.urlhttps://login.microsoftonline.com/your-directory-id/oauth2/token在ADLS Gen 1上尝试一些基本的Delta表操作 // Create a Delta table on ADLS Gen 1: spark.range(5).write.format(delta).save(adl://your-adls-account.azuredatalakestore.net/path/to/delta-table)// Read a Delta table on ADLS Gen 1: spark.read.format(delta).load(adl://your-adls-account.azuredatalakestore.net/path/to/delta-table)有关其他语言和Delta表操作的更多示例请参见Delta Lake快速入门。 9.6.2.3. 配置Azure Data Lake Storage Gen1 以下是在Azure Data Lake Storage Gen1上配置Delta Lake的步骤。 配置LogStore实施。 设置spark.delta.logStore.class Spark配置属性 spark.delta.logStore.classorg.apache.spark.sql.delta.storage.AzureLogStore该AzureLogStore实现适用于Azure中的所有存储服务并支持多集群并发写入。这是一个应用程序属性必须在启动之前设置SparkContext并且在上下文的生存期内不能更改。 hadoop-azure-datalake在类路径中包含JAR。Delta Lake的Hadoop需要2.9.1版本而Hadoop则需要3.0.1版本。请确保用于此软件包的版本与构建Spark的Hadoop版本匹配。 设置Azure Data Lake Storage Gen1凭据。 您可以使用凭据在Scala中设置以下Hadoop配置 sc.hadoopConfiguration.set(dfs.adls.oauth2.access.token.provider.type, ClientCredential) sc.hadoopConfiguration.set(dfs.adls.oauth2.client.id, your-oauth2-client-id) sc.hadoopConfiguration.set(dfs.adls.oauth2.credential, your-oauth2-credential) sc.hadoopConfiguration.set(dfs.adls.oauth2.refresh.url, https://login.microsoftonline.com/your-directory-id/oauth2/token)9.6.2.4. 配置Azure Blob存储 以下是在Azure Blob存储上配置Delta Lake的步骤。 配置LogStore实施。 设置spark.delta.logStore.class Spark配置属性 spark.delta.logStore.classorg.apache.spark.sql.delta.storage.AzureLogStore该AzureLogStore实现适用于所有Azure存储服务并支持多集群并发写入。这是一个应用程序属性必须在启动之前设置SparkContext并且在上下文的生存期内不能更改。 hadoop-azure在类路径中包含JAR。Delta Lake的Hadoop需要2.9.1版本而Hadoop则需要3.0.1版本。请确保用于此软件包的版本与构建Spark的Hadoop版本匹配。 设置凭据。 您可以在Spark配置属性中设置凭据。 我们建议您使用SAS令牌。在Scala中您可以使用以下命令 spark.conf.set(fs.azure.sas.your-container-name.your-storage-account-name.blob.core.windows.net,complete-query-string-of-your-sas-for-the-container)或者您可以指定一个帐户访问密钥 spark.conf.set(fs.azure.account.key.your-storage-account-name.blob.core.windows.net,your-storage-account-access-key)访问您的ABS帐户上的数据。使用Delta Lake在您的ABS帐户上读写数据。例如在Scala中 spark.write.format(delta).save(wasbs://your-container-nameyour-storage-account-name.blob.core.windows.net/path/to/delta-table) spark.read.format(delta).load(wasbs://your-container-nameyour-storage-account-name.blob.core.windows.net/path/to/delta-table)有关其他语言和Delta表操作的更多示例请参见Delta Lake快速入门。 10. 八、Delta Lake - 理论[理解] 学习步骤 理解事务日志在Delta Lake中的重要性理解Delta Lake中关于Schema验证和演化的相关概念了解Delta Lake的最佳实践 10.1. 1. 理解Delta Lake的事务日志 事务日志是理解Delta Lake的关键因为它贯穿其许多最重要功能包括 ACID事务可伸缩的元数据处理时间旅行等。 在本文中我们将探讨什么是Delta Lake事务日志它在文件级别的工作方式以及如何为多次并发读写问题提供一个优雅的解决方案。 10.1.1. 1.1 什么是事务日志 Delta Lake事务日志也称为DeltaLog是自Delta Lake表创建以来已执行过的每个事务的有序记录。 我们在前面已经有所接触就是每个Delta表的_delta_log文件夹内的那些JSON数据文件。 10.1.2. 1.2 事务日志如何工作 10.1.2.1. 1.2.1 将事务分解成原子提交 每当用户执行修改表的操作例如INSERTUPDATE或DELETE时Delta Lake都会将该操作分解为由以下一个或多个操作组成的一系列离散步骤。 添加文件 –添加数据文件。删除文件 –删除数据文件。更新元数据 –更新表的元数据例如更改表的名称架构或分区。设置事务 –记录结构化流作业已使用给定ID提交了一个微批处理。更改协议 –通过将Delta Lake事务日志切换到最新的软件协议来启用新功能。提交信息 –包含有关提交从何处在什么时间进行了哪些操作的信息。 然后将这些操作作为有序的原子单位称为提交记录在事务日志中。 例如假设用户创建一个事务以将新列添加到表中并向其中添加更多数据。Delta Lake会将交易分解为各个组成部分一旦交易完成请按以下提交将其添加到交易日志中 更新元数据–更改架构以包括新列添加文件–为每个添加的新文件 10.1.2.2. 1.2.2 文件级别的Delta Lake事务日志 用户创建Delta Lake表时该表的事务日志将自动在_delta_log子目录中创建 当对该表进行更改时这些更改将按顺序记录在事务日志中并且是原子提交。每次提交均以JSON文件的形式写出如000000.json。对表的其他更改将按数字升序生成后续的JSON文件以便将下一个提交写为000001.json将其写为000002.json依此类推。 转存失败重新上传取消 如果也许我们改变了主意决定删除这些文件并改为添加一个新文件。这些操作将被记录为事务日志中的下一次提交如下所示 转存失败重新上传取消 如图第二次操作删除了2个文件被记录在000001.json中。安装前面学习的版本回退操作000001.json也代表的是version1version从0开始算 尽管我们在事务日志中体现了这两个文件被删除但是在磁盘上这两个文件依旧会保留。因为要做到版本管理。 如果确实想删除需要使用vacuum函数。 10.1.2.3. 1.2.3 使用检查点文件快速重新计算状态 一旦我们对事务日志总共提交了10次提交Delta Lake将以Parquet格式将检查点文件保存在同一_delta_log子目录中。Delta Lake每10次提交会自动生成一个检查点文件。 转存失败重新上传取消 这些检查点文件会在某个时间点保存表的整个状态-以本机Parquet格式保存Spark可以轻松快速地读取它们。 这样做的好处在于如果我们要回滚到某个版本完全不需要从版本0开始一个版本一个版本的推导。 而是直接跳到最近的检查点从检查点开始推导。 比如当前的最新版本是15如果想要回退到版本12如果没有检查点的话就需要从版本0到版本1到版本2一直到版本12重现一遍计算得出版本12的内容。 但是有了检查点之后就简单多了按照10次提交生成检查点文件那么在版本10的时候就有一个检查点对于回退到版本12的操作就变成了 回退到检查点版本10从版本10开始推导进入版本11再推导进入版本12即可完成版本12的回退操作 可以说检查点就是某个版本的快照。 这些都是Delta Lake自动操作的无需我们干预。 10.1.2.4. 1.2.4 处理多个并发读取和写入 由于Delta Lake由Apache Spark提供支持因此多个用户有可能同时修改表。为了处理这些情况Delta Lake采用了乐观并发控制 10.1.2.4.1. 乐观并发控制 乐观并发控制是一种处理并发事务的方法该方法假定不同用户对表进行的事务更改可以完成而不会相互冲突。之所以如此之快是因为在处理PB级数据时用户极有可能会完全处理数据的不同部分从而使他们能够同时完成无冲突的事务。 例如假设您和我正在一起研究拼图游戏。只要我们俩都在研究它的不同部分例如您在角落而我在边缘就没有理由为什么我们不能同时为更大难题的一部分工作并以两倍快的速度完成拼图。只有当我们需要相同的零件时才会出现冲突。那就是乐观的并发控制。 也就是在大数据下能让用户之间起冲突的概率不高多数情况下因为数据规模大你处理你那部分我处理我的部分。 当然即使采用了乐观的并发控制有时用户的确会尝试同时修改数据的相同部分。对此Delta Lake对此有一个协议。 10.1.2.4.2. 乐观的解决冲突 转存失败重新上传取消 用户1和2都试图同时向表中添加一些数据。在这里我们陷入了冲突因为下一次只能提交一次并记录为000001.json。Delta Lake通过“互斥”的概念解决了这一冲突这意味着只有一个用户可以成功进行提交000001.json。也就是假设接受用户1的提交而拒绝用户2的提交。但是Delta Lake宁愿乐观地处理此冲突也不愿为User 2引发错误。它检查是否对表进行了任何新的提交或者说检查是否有冲突并以静默方式更新表以反映这些更改然后简单地在新提交的表上重试用户2的提交不进行任何数据处理并成功提交000002.json。 在大多数情况下这种和解是无声无缝且成功地进行的。但是如果存在无法解决的问题Delta Lake无法乐观地解决例如如果用户1删除了用户2也删除的文件则唯一的选择是抛出错误。 10.1.3. 1.3 其他用例 10.1.3.1. 1.3.1 时间旅行 每个表都是Delta Lake事务日志中记录的所有提交的总和的结果–不多也不少。事务日志提供了逐步的指导详细说明了如何从表的原始状态变为当前状态。 也就是从版本0开始根据元数据进行逐步推导当推导到最后的时候和当前最新版本的结果肯定是一样的。 因此我们可以通过从原始表开始在任何时间点重新创建表的状态并且只处理该点之前的提交。这种强大的功能被称为“时间旅行”或数据版本控制。 10.1.3.2. 1.3.2 数据审查 由于事务日志记录的每一次对数据的操作那么对数据进行审查就有据可依了。 这个功能对部分企业来说十分重要。 10.2. 2. 模式验证和演变 数据就像我们的经验一样总是在不断发展和积累。为了跟上步伐我们的数据模型必须适应新的数据。 这就带来了挑战Schema的更改在任何数据产品中都是重量级的操作在Delta Lake中也是一样。 我们在前面演示过如何去合并Schema以及如何强制的覆盖Schema。 这些操作遵循Delta Lake在Scheam演变中的设计思想 10.2.1. 2.1 了解表架构 Apache Spark™中的每个DataFrame都包含一个架构一个定义数据形状的蓝图例如数据类型和列以及元数据。 使用Delta Lake表的架构以JSON格式保存在事务日志中。 也就是_delta_log这个文件夹内的事务日志不仅仅保存数据的更改记录同时也保存着数据的Schema记录。 所以我们一直称之为元数据 10.2.2. 2.2 模式验证 在第七章第二节7.2 Schema验证中我们演示了关于Schema更改的一系列操作。 这些操作的背后是遵循如下的理论的。 模式验证是Delta Lake中的一种安全措施它通过拒绝对表的模式不匹配的写入来确保数据质量。就像忙碌的餐厅的前台经理只接受预订一样它会检查插入表中的数据中的每一列是否在其预期列的列表中换句话说每一列是否都有“预订”以及拒绝所有不在列表中的列的写操作。 case class Test1(id:Int, info:String) case class Test2(id:Int, info:String, comment:String)import io.delta.tables._ val df1 sc.makeRDD(List(Test1(1, haha), Test1(2, heihei))).toDF df1.showdf1.write.format(delta).save(/tmp/delta/sv)val df2 sc.makeRDD(List(Test2(1, haha, Test2), Test2(2, heihei, Test2))).toDF# 当我们执行 df2.write.format(delta).mode(append).save(/tmp/delta/sv)转存失败重新上传取消 模式验证就会自动工作帮助我们拒绝Schema不匹配的工作。 我们可以使用mergeSchema和overwriteSchema来进行 mergeSchema Schema合并增加列不会删除列overwriteSchema 按新Schema强制覆盖 10.2.3. 2.3 模式验证如何工作 Delta Lake 在写数据上使用架构验证这意味着在写入时会检查对表的所有新写入是否与目标表的架构兼容。如果架构不兼容则Delta Lake将完全取消事务不写入任何数据并引发异常以使用户知道不匹配的情况。 为了确定对表的写入是否兼容Delta Lake使用以下规则。要写入的DataFrame 不能包含目标表的架构中不存在的任何其他列。相反如果传入的数据不包含表中的每一列则可以-这些列将被简单地分配为空值。列数据类型不能与目标表中的列数据类型不同。如果目标表的列包含StringType数据但DataFrame中的相应列包含IntegerType数据则模式强制实施将引发异常并阻止进行写操作。不能包含仅大小写不同的列名。这意味着您不能在同一表中定义诸如“ Foo”和“ foo”之类的列。虽然Spark可用于区分大小写或不区分大小写默认模式但Delta Lake保留大小写但在存储架构时不区分大小写。存储和返回列信息时Parquet区分大小写。为了避免潜在的错误数据损坏或丢失问题添加了此限制。 10.2.4. 2.4 模式验证有何用处 由于执行了如此严格的检查因此模式强制实施是用作准备好用于生产或使用的干净完全转换的数据集。通常可以帮助于 机器学习算法强Schema要求BI仪表板数据分析和可视化工具任何需要高度结构化强类型语义模式的生产系统 为了为业务提供数据多数企业选择多跳的模式也就是迭代中间数据。 在这个过程中的模式验证是非常有必要的。 这避免了我们将原以为干净的中间数据变成脏数据 10.2.5. 2.5 模式演变 模式演变是一项功能使用户可以轻松更改表的当前模式以适应随时间变化的数据。 也就是我们前面学习的两个操作属性 mergeSchemaoverwriteSchema 10.2.5.1. 有什么用 计划 更改可以在您打算更改表的计划时使用与您不小心将不应该存在的列添加到DataFrame的位置相反。这是迁移架构的最简单方法因为它会自动添加正确的列名称和数据类型而无需显式声明它们。 通常来说我们推荐使用mergeSchema而不是overwriteSchema 除非你很有必要不然多数情况下合并架构是一种最优的选择 10.3. 3. Delta Lake 最佳实践 10.3.1. 3.1 选择合适的分区列 你可以对Delta Lake的表进行分区存储最常见的分区当然就是时间了。 请遵循以下经验法则来决定要按哪个分区进行分区 如果列的基数不重复数量很高请不要使用该列进行分区。例如如果您按一列userId进行分区并且可以有1M个不同的用户ID则这是一个不好的分区策略。最好选择重复数量较多的数据列进行分区比如日期随着数据增长分区不一定能适应新的数据情况可以在空闲的时候尝试重新分区。 10.3.2. 3.2 合并文件compact 我们应该还记得在HBase中的compact操作。可以帮组我们对小HFile文件进行合并合并成大的HFile文件来提高性能。 对于Delta Lake也可以如此。 随着表版本的增加其存储中会产生许多的parquet文件那么在这个时候对这些零散的parquet文件进行合并可以显著的提高执行性能。 示范 import scala.util.Random case class Test(id:Int, info:String) val df sc.makeRDD(List(Test(Random.nextInt(10000), haha))).toDF df.write.format(delta).save(/tmp/delta/compact) df.show# 连续append20次 1 to 20 foreach(x sc.makeRDD(List(Test(Random.nextInt(10000), haha))).toDF.write.format(delta).mode(append).save(/tmp/delta/compact))spark.read.format(delta).load(/tmp/delta/compact).toDF.show# 查看parquet文件数量 hadoop fs -ls /tmp/delta/compact转存失败重新上传取消 当前目录下有69个项目。 我们来执行以下compact操作 val path /tmp/delta/compact val numFiles 8spark.read.format(delta).load(path).repartition(numFiles).write.option(dataChange, false).format(delta).mode(overwrite).save(path)将分区压缩为8个在看下结果 转存失败重新上传取消 项目为77个说明增加了8个文件正好就是我们设置的压缩文件数8 不要理解为将所有的parquet都压缩为8个。并不是。 我们的compact也是对表的一次操作也是更新了一次版本。 这个压缩数量8的意思是表示最新版本以8个parquet保存。 历史的那些哪怕数量再多如果我们不执行回退操作的话对我们来说就是无用的文件。 所以对当前最新版本有用的就是那8个新增的parquet了。 10.4. 总结 Delta Lake是一款存储层框架是对Spark、SparkSQL的一个增强。 它赋予我们在执行多跳架构迭代中间数据进行数据利用的过程中 事务控制乐观并发解决冲突版本控制数据审查Schema验证Schema演化合并、覆盖等增强功能 极大的增强了企业在数据湖数据利用方面的效率节省了开发人员的时间。 我们再来回看这个图 转存失败重新上传取消 应该能够理解Delta Lake就是构建在存储层HDFS、S3、微软云存储等之上 为中间数据迭代Lambda架构、多跳架构提供增强功能也支撑。 正如图中所描述的 转存失败重新上传取消 数据利用就是 从青铜原始数据结构化、半结构化、非结构化进行处理走到白银中间数据基本上中间数据都是结构化数据或者严格的半结构化数据。一般中间数据不会有非结构化的了最终成为黄金业务可用数据 的一系列流程Delta Lake为这个流程保驾护航增强功能提高企业效率。 11. 九、企业数据湖应用案例分析[实操一遍] 目标 学习如何在IDEA中开发集成Delta Lake功能的SparkJOB 学习步骤 理解企业数据湖应用的需求分析完成SparkJob的开发并集成DeltaLake功能 我们已经完成了关于Delta Lake的 概念学习快速入门常见操作和理论学习 在这一章我们模拟一个企业的离线统计分析需求来演示一下如何在IDEA项目中使用Delta Lake 11.1. 1. 需求分析 假设我们是一家做搜索引擎的公司每天有许多人登陆网站进行搜索操作。 这些用户行为统统被记录为日志存储并被统一采集Flume源源不断的输入到HDFS中进行存储。 数据格式 访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL其中用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值即同一次使用浏览器输入的不同查询对应同一个用户ID 现在公司的需求 政府进行合作要求进行舆情监测基于搜索引擎的搜索关键词为政府提供社会舆情分析报告。 每小时TOP10全天热门舆情排行 转存失败重新上传取消 假设我们现在是在一个已经搭建好的数据湖平台之上关于 数据输入数据应用数仓、Mysql等数据展现数据管理安全、权限等等 均已是开发完成了的我们要做的就是在这个已存在数据湖平台之上 开发SparkJob应用DeltaLake完成需求的开发 关于数据湖的构建我们前面说过其就是普通的大数据开发平台只不过根据用途不同其规模、定位、以及所要完成的工作是不同的。 那么构建数据湖就不在课程里面讲解了因为就算是讲解还是 搭建Hadoop搭建Spark构建Kafka、Flume等数据管道准备数据应用数仓、Mysql做BI等等这一套大数据平台的构建过程 那就和我们课程的主旨不符了。 假设公司已有数据湖只不过呢没有应用DeltaLake 11.2. 2. 需求实现 数据文件搜狗实验室Sogou Labs 提供有一个月的完整数据 课程中使用的是精简版1天的数据 修复格式错误后的链接百度网盘 请输入提取码 提取码r6qk 前置要求 同学们需要有一个可用的Hadoop、YARN、Spark的应用环境 项目用到的POM文件 PS: 具体的软件版本请根据你个人的来修改 但是Scala必须是2.11 以及Spark也必须是基于Scala 2.11的 同时Spark版本要大于等于2.4.2 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIditcast.cn/groupIdartifactIdDeltaLakeDemo/artifactIdversion1.0/versionrepositoriesrepositoryidaliyun/idurlhttp://maven.aliyun.com/nexus/content/groups/public//url/repositoryrepositoryidcloudera/idurlhttps://repository.cloudera.com/artifactory/cloudera-repos//url/repositoryrepositoryidjboss/idurlhttp://repository.jboss.com/nexus/content/groups/public/url/repository/repositoriespropertiesmaven.compiler.source1.8/maven.compiler.sourcemaven.compiler.target1.8/maven.compiler.targetencodingUTF-8/encodingscala.version2.11.8/scala.versionscala.compat.version2.11/scala.compat.versionhadoop.version2.6.0-cdh5.14.0/hadoop.versionspark.version2.4.5/spark.version/propertiesdependenciesdependencygroupIdio.delta/groupIdartifactIddelta-core_2.11/artifactIdversion0.5.0/version/dependencydependencygroupIdorg.scala-lang/groupIdartifactIdscala-library/artifactIdversion${scala.version}/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_2.11/artifactIdversion${spark.version}/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql_2.11/artifactIdversion${spark.version}/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion${hadoop.version}/version/dependencydependencygroupIdcom.typesafe/groupIdartifactIdconfig/artifactIdversion1.3.3/version/dependency/dependenciesbuildsourceDirectorysrc/main/scala/sourceDirectorytestSourceDirectorysrc/test/scala/testSourceDirectoryplugins!-- 指定编译java的插件 --plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion3.5.1/version/plugin!-- 指定编译scala的插件 --plugingroupIdnet.alchim31.maven/groupIdartifactIdscala-maven-plugin/artifactIdversion3.2.2/versionexecutionsexecutiongoalsgoalcompile/goalgoaltestCompile/goal/goalsconfigurationargsarg-dependencyfile/argarg${project.build.directory}/.scala_dependencies/arg/args/configuration/execution/executions/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-surefire-plugin/artifactIdversion2.18.1/versionconfigurationuseFilefalse/useFiledisableXmlReporttrue/disableXmlReportincludesinclude**/*Test.*/includeinclude**/*Suite.*/include/includes/configuration/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion2.3/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationfiltersfilterartifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filterstransformerstransformerimplementationorg.apache.maven.plugins.shade.resource.ManifestResourceTransformermainClass/mainClass/transformer/transformers/configuration/execution/executions/plugin/plugins/build /project11.2.1. 2.1 转换原始数据生成基础表 我们先生成一份基础数据对时间进行多字段显示方便后面使用字段为 date:String, hour:Int, minute:Int, userID:String, topic:String, resultRank:Int, clickRank:Int, url:String代码实现 package cn.itcast.deltalake.demoimport java.text.SimpleDateFormat import java.util.Dateimport org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession/*** 作者传智播客* 描述将原始格式进行时间细粒度划分最小粒度划分为分钟*/case class OutputDefine(date:String, hour:Int, minute:Int, userID:String, topic:String, resultRank:Int, clickRank:Int, url:String) object OriginTransform {def main(args: Array[String]): Unit {val originFilePath args(0)val outputTablePath args(1)val dateSDF new SimpleDateFormat(yyyy-MM-dd)val standardSDF new SimpleDateFormat(yyyy-MM-dd HH:mm:ss)val hourSDF new SimpleDateFormat(HH)val minuteSDF new SimpleDateFormat(mm)// 由于原始数据中没有日期我们假设为今天的数据val today dateSDF.format(new Date)val spark SparkSession.builder().appName(OriginTransform).getOrCreate()val sc: SparkContext spark.sparkContextimport spark.implicits._// Read origin data.val lineRDD: RDD[Array[String]] sc.textFile(originFilePath).map(line line.split(\t))// 处理时间并写入到样例类OutputDefine中val resultRDD lineRDD.map(x {// 拼接今天的日期 和原始数据中的时分秒转换为时间戳val ts standardSDF.parse(today x(0)).getTimeval hour hourSDF.format(new Date(ts)).toIntval minute minuteSDF.format(new Date(ts)).toInt// case class OutputDefine(date:String, hour:Int, minute:Int, userID:String, topic:String, resultRank:Int, clickRank:Int, url:String)val userID x(1)val topic x(2).replace([, ).replace(], )val resultRank x(3).toIntval clickRank x(4).toIntval url x(5)try{OutputDefine(today, hour, minute, userID, topic, resultRank, clickRank, url)}catch {case e:Exception {e.printStackTrace()throw new Exception(sorigin: $userID, $topic, $url)}}})// 转换为Dataframe 写入Delta Lake TableresultRDD.toDF.write.format(delta).save(outputTablePath)spark.close()} }提交Spark任务到YARN /root/soft/spark-2.4.5-bin-hadoop2.6/bin/spark-submit --class cn.itcast.deltalake.demo.OriginTransform --master yarn --deploy-mode client /root/DeltaLakeDemo-1.0.jar /data.txt /delta/silver/basic_minute为了方便观察日志这里提交到YARN使用的是客户端模式而非集群模式 完成后在Spark-Shell中查看结果 转存失败重新上传取消 11.2.2. 2.2 添加新列到基础表 现在突然有一个问题就是我们这个数据表里面虽然有时间但是处理起来不方便。 如果想要得到时间戳需要将日期、小时、和分钟拼接起来来解析比较不利于后面对于时间的操作。 也就是我们忘记把时间戳作为字段加入到表中了。 对刚刚的代码的逻辑进行相应修改代码如下 package cn.itcast.deltalake.demoimport java.text.SimpleDateFormat import java.util.Dateimport org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession/*** 作者传智播客* 描述将原始格式进行时间细粒度划分最小粒度划分为分钟*/case class OutputDefine2(ts: Long, date: String, hour: Int, minute: Int, userID: String, topic: String, resultRank: Int, clickRank: Int, url: String) object OriginTransform2 {def main(args: Array[String]): Unit {val originFilePath args(0)val outputTablePath args(1)val dateSDF new SimpleDateFormat(yyyy-MM-dd)val standardSDF new SimpleDateFormat(yyyy-MM-dd HH:mm:ss)val hourSDF new SimpleDateFormat(HH)val minuteSDF new SimpleDateFormat(mm)// 由于原始数据中没有日期我们假设为今天的数据val today dateSDF.format(new Date)val spark SparkSession.builder().appName(OriginTransform).getOrCreate()val sc: SparkContext spark.sparkContextimport spark.implicits._// Read origin data.val lineRDD: RDD[Array[String]] sc.textFile(originFilePath).map(line line.split(\t))// 处理时间并写入到样例类OutputDefine中val resultRDD lineRDD.map(x {// 拼接今天的日期 和原始数据中的时分秒转换为时间戳val ts standardSDF.parse(today x(0)).getTimeval hour hourSDF.format(new Date(ts)).toIntval minute minuteSDF.format(new Date(ts)).toInt// case class OutputDefine(date:String, hour:Int, minute:Int, userID:String, topic:String, resultRank:Int, clickRank:Int, url:String)val userID x(1)val topic x(2).replace([, ).replace(], )val resultRank x(3).toIntval clickRank x(4).toIntval url x(5)try{OutputDefine2(ts, today, hour, minute, userID, topic, resultRank, clickRank, url)}catch {case e:Exception {e.printStackTrace()throw new Exception(sorigin: $userID, $topic, $url)}}})// 转换为Dataframe 写入Delta Lake Table执行合并Schema的操作resultRDD.toDF.write.format(delta).mode(overwrite).option(mergeSchema, true).save(outputTablePath)spark.close()} }提交任务 /root/soft/spark-2.4.5-bin-hadoop2.6/bin/spark-submit --class cn.itcast.deltalake.demo.OriginTransform2 --master yarn --deploy-mode client /root/DeltaLakeDemo-1.0.jar /data.txt /delta/silver/basic_minute完成后查看结果 转存失败重新上传取消 11.2.3. 2.3 聚合每小时的数据统计每小时TOP10 基于刚刚的基础数据统计每小时TOP10 生成格式如下 datehourtopicranknum2020-06-066中国人均GDP突破2W美元112342020-06-066可控核聚变有望在年内实现商用21001 示例代码 package cn.itcast.deltalake.demoimport java.text.SimpleDateFormat import java.util.Dateimport org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.json.JSONObjectimport scala.collection.mutable import scala.util.parsing.json.JSON/*** 作者传智播客* 描述统计每小时TOP100*/case class HourTOP(date:String, hour:Int, topic:String, rank:Int, num:Int)object HourTop10 {def main(args: Array[String]): Unit {val hourSDF new SimpleDateFormat(yyyy-MM-dd HH)val inputTablePath args(0)val outputTablePath args(1)// Get spark contextval spark SparkSession.builder().master(local[*]).appName(AggregationByMinute).getOrCreate()val sc: SparkContext spark.sparkContextimport spark.implicits._// 读取表val inputDF spark.read.format(delta).load(inputTablePath).toDF()inputDF.createOrReplaceTempView(t_basic) // 注册表val frame: DataFrame spark.sql(select ts, topic from t_basic) // 查询ts和topic// 获取小时 和 对应的topicval hourWithTopic: RDD[(String, String)] frame.map(x {val hour hourSDF.format(new Date(x.getLong(0)))val topic x.getString(1)hour - topic}).rdd// 用Map做topic和次数的统计val topicWithCountMap mutable.Map[String, Int]()// 聚合小时以当前小时内的聚合统计次数的Map作为valueval hourWithMap: RDD[(String, mutable.Map[String, Int])] hourWithTopic.groupByKey().mapValues(y {y.foreach(x {var topicCount: Int topicWithCountMap.getOrElse(x, 0)topicCount 1 // 如果没有这个key得到的是01后正好是1put进去如果得到了就原样1topicWithCountMap (x - topicCount) // 次数1})topicWithCountMap // 返回这个Map})// 转换这个Map为top10并写入样例类val hourTOPListRDD: RDD[List[HourTOP]] hourWithMap.map(x {// List[(String, Int)] List[(topic, num)]val top10: List[(String, Int)] x._2.toList.sortBy(_._2).reverse.take(10)val date x._1.split( )(0)val hour x._1.split( )(1).toIntvar rank 0top10.map(y {rank 1HourTOP(date, hour, y._1, rank, y._2)})})// 由于RDD没有flatten方法所以用flatMapmap原样返回即可就相当于flatten了。val resultRDD: RDD[HourTOP] hourTOPListRDD.flatMap(x x)// 转换为Dataframe写入Delta Lake TableresultRDD.toDF().write.format(delta).save(outputTablePath)spark.close()} }提交任务 /root/soft/spark-2.4.5-bin-hadoop2.6/bin/spark-submit --class cn.itcast.deltalake.demo.HourTop10 --master yarn --deploy-mode client /root/DeltaLakeDemo-1.0.jar /delta/silver/basic_minute /delta/gold/hourTOP10完成后查看表 转存失败重新上传取消 11.2.4. 2.4 统计全天热门TOP100 同样基于那一份基础表数据来统计一下全天热门TOP100对每小时热门TOP10的代码简单修改即可。 输出表结构 datetopicranknum2020-06-06中国人均GDP突破2W美元112342020-06-06可控核聚变有望在年内实现商用21001 示例代码 package cn.itcast.deltalake.demoimport java.text.SimpleDateFormat import java.util.Dateimport org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.json.JSONObjectimport scala.collection.mutable import scala.util.parsing.json.JSON/*** 作者传智播客* 描述统计每天TOP100*/case class DayTOP(date:String, topic:String, rank:Int, num:Int)object DayTop100 {def main(args: Array[String]): Unit {val daySDF new SimpleDateFormat(yyyy-MM-dd)val inputTablePath args(0)val outputTablePath args(1)// Get spark contextval spark SparkSession.builder().master(local[*]).appName(AggregationByMinute).getOrCreate()val sc: SparkContext spark.sparkContextimport spark.implicits._// 读取表val inputDF spark.read.format(delta).load(inputTablePath).toDF()inputDF.createOrReplaceTempView(t_basic) // 注册表val frame: DataFrame spark.sql(select ts, topic from t_basic) // 查询ts和topic// 获取day 和 对应的topicval dayWithTopic: RDD[(String, String)] frame.map(x {val day daySDF.format(new Date(x.getLong(0)))val topic x.getString(1)day - topic}).rdd// 用Map做topic和次数的统计val topicWithCountMap mutable.Map[String, Int]()// 聚合day以当前day内的聚合统计次数的Map作为valueval dayWithMap: RDD[(String, mutable.Map[String, Int])] dayWithTopic.groupByKey().mapValues(y {y.foreach(x {var topicCount: Int topicWithCountMap.getOrElse(x, 0)topicCount 1 // 如果没有这个key得到的是01后正好是1put进去如果得到了就原样1topicWithCountMap (x - topicCount) // 次数1})topicWithCountMap // 返回这个Map})// 转换这个Map为top100并写入样例类val dayTOPListRDD: RDD[List[DayTOP]] dayWithMap.map(x {// List[(String, Int)] List[(topic, num)]val top100: List[(String, Int)] x._2.toList.sortBy(_._2).reverse.take(100)val date x._1var rank 0top100.map(y {rank 1DayTOP(date, y._1, rank, y._2)})})// 由于RDD没有flatten方法所以用flatMapmap原样返回即可就相当于flatten了。val resultRDD: RDD[DayTOP] dayTOPListRDD.flatMap(x x)// 转换为Dataframe写入Delta Lake TableresultRDD.toDF().write.format(delta).save(outputTablePath)spark.close()} }提交任务 /root/soft/spark-2.4.5-bin-hadoop2.6/bin/spark-submit --class cn.itcast.deltalake.demo.DayTop100 --master yarn --deploy-mode client /root/DeltaLakeDemo-1.0.jar /delta/basic_minute /delta/gold/dayTOP100完成后查看数据 转存失败重新上传取消 11.2.5. 2.5 将输出的数据合并为1个parquet文件 导出到其它目录供其它程序如ETL、数据管道进行使用。 执行(Spark-Shell) val inPath /delta/gold/dayTOP100 val outPath /deltaExport/dayTOP100Result val numFiles 1spark.read.format(delta).load(path).repartition(numFiles).write.option(dataChange, false).format(parquet).save(outPath)然后将生成的最新版本的那一个单独的parquet复制出来即可 验证 spark.read.format(parquet).load(/deltaExport/dayTOP100Result/xxxxx.parquet).toDF.show这样我们的结果就导出为一个普通的parquet文件了这个文件可以被其它程序方便的使用。 PS: 你可以选择导出为多个parquet文件具体合并为几个看你的数据量大小。 一般合并为1个即可。 11.3. 3. 总结 至此我们已经完成了这两个需求的开发。 那么当然在企业内数据工程师除了开发SparkJob进行数据计算外也会有一些其他的操作如 数据错误回滚版本导出数据为清单文件合并维护delta表数据分区等等 这一系列的操作我们就不演示了基于在前面Spark-Shell中学习到的操作我们就可以完成对DeltaTable的这些操作。 那么本章的主要目的是为了 演示如何在IDEA中开发Delta Lake相关项目体验一下Delta Lake所推崇的多跳思想 青铜数据原始数据白银数据生成的基础表黄金数据统计的每小时TOP和每天TOP的结果表体验Delta Lake和Spark的集成 其实95%都是Spark的原本代码我们只是在读数据和写数据上用到了Delta Lake而已。所以对于企业来说想要从原有的数据湖迁移到Delta Lake是非常方便的。 关于Delta Lake的部分就到此结束下一章我们了解一下在AWS上的数据湖实现方案介绍。 12. 十、基于AWS的云上数据湖实现方案介绍[了解] 步骤 了解云平台的概念拓展经验了解在AWS云上的数据湖方案拓展经验。 12.1. 1. 云平台的介绍 12.1.1. 1.1 前言 随着云计算概念的不断落地和推广, 目前云平台已经得到了非常广泛的使用. 云平台帮助用户在: 应用落地服务落地安全保障性能 等方面获得比传统方式更高效, 更节省, 更稳定, 更方便的优势. 12.1.2. 1.2 云平台的概念 云平台也称云计算平台. 云计算, 顾名思义, 就是将计算在云上运行. 那么在这里面的两个概念 计算: 这是一个范围很大的名词, 除了能指代业务数据的计算等, 更多时候是指代服务 或者 应用云: 通俗的理解就是远程计算机, 并且是一组 一堆, 它们远程为使用者提供服务, 提供计算. 我们可以这样理解: 云平台 就是 一个云上的平台, 为用户提供各种各样的 远程服务 转存失败重新上传取消 12.1.2.1. 举个例子: 现在有一个 人力外包中心, 其内部有非常多的人力资源可供客户购买使用. 那么有一个客户, 从人力资源外包公司, 花钱雇10个人干活, 发现效率不行又雇了100个一起干活, 最终活儿按时完成. 转存失败重新上传取消 那么, 上述例子就是对云平台的一种模拟. 人力资源中心 提供的是服务, 提供的是资源, 客户只需要按量购买即可. 在例子中, 客户如果不使用人力资源中心 就需要自己招聘相应的员工, 签订劳务合同, 让自己的员工去为自己服务. 但是, 如果需求结束了, 员工又不能随意辞退, 那么这些员工就相当于资源闲置了. 而人力资源中心 就是提供了 资源 供客户使用, 按需求 按用量付费即可, 用完即停止. 对客户来说资源没有闲置. PS: 现在很多软件人力外包, 就是这样的思路. 很多甲方公司, 不愿意招聘正式员工, 仅仅某个项目需求人手, 就从外包公司招人来做, 项目完成, 人员也就遣返回外包公司了. 甲方按人数和时间给外包公司付款. 回到计算机的世界中 云平台提供的就是 计算的资源. 那么计算的资源主要有: 硬件资源: 主要指服务器 交换机 磁盘 GPU等硬件资源软件资源: 主要指 各种软件工具 如域名服务 虚拟内网 数据库软件 等. 云平台为客户提供了 一站式的解决方案. 客户可以没有任何一台服务器 同样可以搭建起来自己的业务. 业务 就运行在云平台之上. 转存失败重新上传取消 通俗的理解, 使用了云平台之后, 客户就不需要自行搭建机房了, 不需要自购服务器了. 服务器等硬件资源 从云平台购买使用即可. 并且因为云平台上的资源是很多的, 如果客户觉得资源不足, 可以追加购买. 如果觉得资源过多, 可以减少购买. 灵活方便. 毕竟, 自建机房成本很高, 并且服务器等硬件购买是一次性. 买回来发现用不到,造成资源的闲置 也是无可避免. 特别是某些业务突增的需求, 导致资源紧张, 临时加了N台服务器. 等到业务下降的时候, 这么多追加的服务器的资源就闲置了. 消费者还能在闲鱼让闲置游起来. 但是服务器领域............就算也能各种二手倒腾, 在机房频繁的上架下架 也是很繁琐的. 特别是运维同学, 估计要打人...... 12.1.3. 1.3 云平台的分类 云平台主要有 2大类, 分别是: 12.1.3.1. 私有云平台 私有云平台, 简称私有云 顾名思义就是私人的云平台, 一般是企业自行搭建, 提供给企业内部去使用. 如, 各个业务部门 或者各个项目组作为客户, 从平台上购买资源,或者申请资源去使用费用一般企业内部结算。 是一种提高企业内 资源利用率的手段同时基于云平台上提供的各种服务也方便企业内部的开发。 但对于企业本身来说, 其硬件资源是自行组建的.(如 自建机房 自购服务器等) 12.1.3.2. 公有云平台 公有云平台就是提供给大众使用的云平台. 任何人 或者任何企业 均可以在公有云平台内去 购买 申请 相应的资源. 对于公有云平台的提供商来说, 其本身的硬件资源是自行组建的(如自建机房, 自购服务器, 搭建数据中心) 本次课程, 主要给大家讲解公有云平台. 12.1.4. 1.4 主流公有云平台 提供公有云服务的平台有许多, 我们来列举一下(顺序不代表排名): AWS: Amazon web service: 是亚马逊提供的一个公有云平台. 也是最早提供云平台服务的一批企业. 也是目前全球公有云的龙头标杆. 在全球市场占有率处于领先地位. 就如苹果带领手机的发展方向一样, AWS目前处于引领云平台发展方向的地位.Azure: 微软提供的一个公有云平台. 市场占有率一般, 目前处于上升期.GCP: Google cloud paltform: 谷歌提供的云平台, 占有率还行, 也是处于上升期.阿里云: 阿里提供的云平台, 在国内市场很强势. 处于No.1地位, 在国际上占有率一般, 处于上升期.腾讯云: 腾讯提供的云平台, 国内占有率还行, 价格便宜, 目前正在大片的抢占市场.京东云 \ 金山云 \ 时速云 : 占有率比较低. 处于下层梯队. 12.1.5. 1.5 云的三种服务 那么我们再来了解一下PaaS SaaS IaaS 12.1.5.1. IaaS Infrastructure as a Service: 基础设施即服务, 是指把IT基础设施作为服务提供 12.1.5.2. PaaS Platform as a Service : 平台即服务.是指将平台作为一种服务对外提供. 那么我们要学习的云平台, 就是一种PaaS服务. 其他还有如 腾讯地图开发平台 等提供平台服务的 12.1.5.3. SaaS Software as a Service: 软件即服务, 是指将软件作为一种服务对外提供. 如阿里云提供的 云上数据库 就是将数据库软件作为服务对外提供. 还有如 GitHub也是一种SaaS服务 转存失败重新上传取消 那么我们理解, 阿里云是一个PaaS平台, 提供IaaS 和 SaaS服务. 12.1.6. 1.6 公有云对企业或者个人的意义 我们撇开私有云不说, 单说公有云. 公有云的出现, 其实极大的提升了社会的运作效率, 提高了小企业和个人的业务竞争力. 比如, 你是一个小企业或者个人开发者, 在没有公有云之前, 你最少也要请1个运维, 买几台或者租用几台服务器, 租用共享机房,租用带宽 才能提供一个软件部署运行的环境. 这一套不简单, 很繁琐, 费时费力. 并且租金一租一年半年的, 临时加机器也麻烦. 但是公有云的出现, 就可以轻松快速的构建好自己的服务器环境, 创业者只需要专心在业务研发上面,而不用费心费力的去在 机房 服务器等方面费心. 从硬件成本的角度来看, 自购服务器 租用机房的成本是比较低的. 毕竟服务器是一次性付费, 永久使用. 但是在创业初期, 一次性付费服务器是不现实的, 一台服务器的价格 如果业务量不大, 够在公有云上用2年了. 等到业务发展起来之后才去考虑自购服务器.. 上面还只是讨论的硬件成本. 其实综合来说,云还是要省心. 其提供的优点太多, 最主要的就是方便. 对于很多企业来说, 方便省心才是主要. 因为他们的业务能够挣得更多.所以要确保注意力尽量关注与业务 而不是运维 哪怕多花钱在云上也是值得的. 总的来说, 小规模使用, 公有云 简单 灵活 还便宜. 大规模使用, 可能费用会高一些, 但是绝对的省心和方便. 因为云平台除了提供虚拟服务器以外, 还提供许多SaaS服务,我们后面就会学到... 而这些SaaS服务才是黏住用户宁愿多花钱也要上云的最主要原因. 12.2. 2. AWS的数据湖解决方案 那么现在我们来了解一下在AWS上架构数据湖的方案 AWS上有众多服务无论是PaaS还是IaaS层亦或是SaaS层均有大量的适用于企业开展业务的服务。 转存失败重新上传取消 基于这些云上服务数据湖所需要的整个生态体系基本上都有可用的服务。 我们在回头来看一下在前面看过的这张图片 转存失败重新上传取消 可以看出如果进行粗略的划分数据湖实现分为 数据存储核心数据分析核心数据输入数据输出数据应用报表、API等以及围绕数据管理的一些功能权限、安全、审计等 那么基于这些划分我们来看一下AWS之上分别有什么解决方案。 12.2.1. 2.1 存储层[重点] 对于数据湖来说存储层是其核心在AWS之上的核心存储称之为AWS S3 转存失败重新上传取消 去服务器化使用S3这个服务不需要依赖服务器 S3是一款分布式的对象存储系统。 可以当做是AWS云平台之上的HDFS也可以认为它是一个大型的企业私有存储网盘。 S3服务贯穿整个AWS云平台基本上所有AWS上的服务只要对数据有需求都可以操纵S3进行数据的存取。 使用S3来作为数据湖的存储核心在合适不过。其完美的支持了数据湖对存储层的要求 集中存储的需求不限格式任意数据存储的需求随处访问视AWS上的权限配置只要你有权限在AWS上任何服务内都可以直接访问S3内的数据支持多种分析框架 12.2.2. 2.2 数据分析[重点] 在AWS之上提供了多种数据分析的框架 去服务器化Server less 12.2.2.1. Server Less的大数据分析引擎Amazon Athena 转存失败重新上传取消 Athena 没有服务器是AWS上的一款SaaS服务你只需要创建Athena并在云上使用它即可。 它可以快速的针对S3内的文件进行交互式查询读时模式并且由于和S3存储是独立的服务所以存储和计算是分离的。 你可以非常容易的动态扩展Athena的算力只需要鼠标点击即可完成Athena的扩容并在不用的时候随时进行缩容或者停止服务。 12.2.2.2. 测试 在athena上建表 CREATE EXTERNAL TABLE testdata (time STRING,userid STRING,topic STRING,rank_result INT,rank_click INT,url STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY \t LOCATION s3://test-datalake/test/;然后就可以执行SQL查询了 12.2.2.3. AWS之上的Hadoop EMR [重点] 转存失败重新上传取消 在AWS之上你可以快速基于EMR服务快速的计算Hadoop计算集群。 并基于S3进行的存储完全解耦计算和存储EMR中的Hadoop支持从S3读写数据也就是将S3当成HDFS用 EMR提供了快速的集群构建方式只需要鼠标点击几下即可构建出可用的Hadoop集群自动安装部署 并且提供了2种运行模式 集群模式利用EMR自动化的创建出符合要求的Hadoop集群并长期运行步骤执行特色在构建的时候提供给EMR需要执行的任务并规划好集群算力EMR就会自动的创建集群如Spark、MR集群然后自动运行你提供的任务 在执行完成后便停止服务删除集群。 如果说集群模式就是一个自动化安装脚本的话其实重要性并不是很大大不了我们手工安装一套Hadoop、Spark环境也不是太过于困难。 EMR的核心其实还是步骤运行的模式这是一种典型的计算和存储分离的思想。 数据在S3中保存着当我需要计算这些数据的时候比如Spark任务。 我只需要将任务提交给EMR并告知EMR比如我要开启10台4核8G的机器来进行计算那么EMR就会自动的按照你的要求 构建好10台机器的Spark计算集群然后将你提交给EMR的任务提交到Spark中进行计算任务完成后集群停止并删除 并且如果你觉得算力不够你可以在AWS的EMR上开启100台服务器开启10000台服务器去计算你想要的任务都是可以的。 并且这些集群在执行完成后就会自动删除。这样就节省了巨大的成本。 我们知道在计算和存储分离的情况下计算集群只有有任务的时候其才在发挥其存在的价值。 如果没有任务要计算计算集群的存在就是在烧钱。 并且在云服务上服务器都是按照小时计费的。在不计算的时候就不需要集群在需要计算的时候才需要集群。并且用几个小时就付几个小时的钱。 这对于企业来说是非常高效并节省成本的计算方式。 12.2.3. 2.3 数据处理ETL[了解] 转存失败重新上传取消 AWS之上提供了许多数据处理ETL的工具。 比如刚刚介绍过的EMR作为计算引擎完全可以做ETL的任务计算。 同时AWS还提供了 Glue完全托管的ETL服务无服务器架构鼠标点击即可完成ETL任务。并支持使用Scala Python对ETL流程进行自定义。 Lambda是一款无服务的代码运行服务。你可以将你的代码逻辑提交给Lambda即可运行不需要服务器不需要执行环境只要提交给Lambda你的逻辑代码它就可以运行你的代码执行你想要的操作。 那么基于Lambda进行ETL处理也是很合适的。它可以监听S3并配置触发事件自动监听并自动执行你提交给它的代码逻辑。 12.2.4. 2.4 AWS上的实时流服务[了解] 转存失败重新上传取消 AWS提供了实时流服务Amazon Kinesis 这是一款完全托管的实时流计算服务。 server less 我们可以认为它是云上的Flink Spark Streaming Kinesis是一款SaaS服务是一款无服务器的服务同样只需要你点击鼠标配置几下即可创建好Kinesis服务并让其工作。 同时基于云服务的特点想要提高性能提升算力也是鼠标点击几下即可烧钱十分方便。 12.2.5. 2.5 AWS上的数仓服务[了解] 转存失败重新上传取消 我们说过数据湖和数仓是不同的概念数据湖绝对不是替代数仓的概念。 数仓和数据湖应该是互补的。 数据湖提供统一存储全量数据分析 数仓提供基于业务系统提供严谨的数据分析 在AWS之上提供的数仓服务称为Redshift 这是一款分布式的数仓服务其底层基于Postgresql开发兼容Postgresql的使用 你可以将Redshift当成一款分布式、高并发、大容量的Postgresql数据库去使用。 同时一样支持 动态扩容存储层面海量数据动态扩容算力层面按需付费 12.2.6. 2.6 AWS上的KV存储NoSQL - DynamoDB[了解] DynamoDB是AWS上的一款NoSQL服务。列式存储数据库KV形式的数据库。 HBase 提供KV形式的数据存储和查询。 12.2.7. 2.7 数据应用[了解] 12.2.7.1. BI 转存失败重新上传取消 基于QuickSight提供数据展现服务BI 12.2.7.2. API服务 API Gateway 是AWS上提供的一款托管的网关服务。 我们可以认为其是AWS上的Nginx服务。 可以配合前面提到的Lambda无服务器代码运行服务快速开发RESTFul形式的API对外提供。 12.2.8. 2.8 安全、审查、授权[了解] 在AWS云平台上所有的权限权利和安全控制相关基于一个服务IAM 它是AWS上的用户认证和访问控制系统。 基于IAM我们可以完成对云上数据湖的数据管控 同时AWS上也提供了统一的日志存储服务CloudWatch 它支持采集所有AWS服务的日志数据并存储。 我们可以基于CloudWatch完成对数据湖内数据审查的需求。 12.2.9. 2.9 AWS数据湖方案总结 转存失败重新上传取消 如上图我们可以基于AWS云服务去构建完整的数据湖、数据仓库、大数据分析存储等大数据应用架构。 基于S3做核心存储基于EMR athnea做分析查询基于glue, lamda, apigateway redshift , dynamodb, kinisis 基于AWS提供的 核心存储S3分析框架AthenaEMR等ETL工具Lambda、Glue流计算框架KinesisNoSQLDynamoDB权限管控IAM统一日志服务CloudWatch 等等一系列框架来完成数据湖架构的实现、或者大数据架构的实现。 数据湖的概念 传统的大数据平台和数据湖看起来架构是一样的只不过定位不懂执行的工作也不同。DeltaLake框架的入门操作 95%的代码都是普通的Spark SparkSQL代码将传统的数据湖迁移到DeltaLake架构是很简单的。只需要一点点代码的更改就能完成。DeltaLake为我们提供了一系列增强的功能事务控制版本回退schema管理等等在云上的数据湖的实现 介绍了什么是云平台AWS常用的服务 以及针对数据湖核心和周边的一些辅助工具的介绍。 End
http://www.zqtcl.cn/news/169882/

相关文章:

  • 用vs2012做网站案例企业现在有必要做网站吗
  • 网站建设少用控件wordpress默认分类
  • php网站是什么数据库文件网站开发收
  • 新网网站空间做网站和app哪类商标
  • drupal网站建设数据库厦门市网站建设
  • 解释微信微网站室内设计效果图展板
  • 教做发绳的网站游戏网站建设需要多少钱
  • 那个网站可以做双色球号码对比的网站设计好学吗
  • 网站建设如何获取客户韩国建筑网站
  • 固始网站建设公司wordpress会员功能
  • 在哪找做调查赚钱的网站好自己做网站的成本
  • 网站开发职业xshuan主题wordpress
  • 网站代码框架建设网站需要学什么程序
  • 广州搜索seo网站优化企业邮箱密码忘了怎么重置密码
  • 重庆模板网站建设做新房网站怎么弄
  • 深圳做企业网站公司常用的网络营销方式
  • 建设网站公司怎么建站网站开发笔记
  • 网站网页建设论文惠州建设网站公司
  • 中介做网站的别打电话有没有教做健身餐的网站
  • 山东电力建设网站雷州市网站建设
  • 企业网站的意义公司网站建app
  • 网站设计模板免费国庆图片制作小程序
  • 包头焦点网站建设郑州包装设计公司
  • 建行官方网站首页做跨境电商亏死了
  • 河北智能网站建设平台卖链接的网站
  • 网站建设简单点的服装搭配网站建设策划书
  • 哪一个军事网站做的比较好今天第四针最新消息
  • 黄页网站推广app软件查企业公司用什么软件
  • 网站设计机构培训全自动网页制作系统源码
  • 外贸网站建设收益深圳建设厅官网