网站的栏目,吴中网站开发建设多少钱,WordPress主题开发核心知识,舆情报告模板目录
一、什么是MapReduce
二、MapReduce 的设计思想 2.1 分而治之 2.2 构建抽象模型#xff1a;Map和Reduce 2.3 隐藏系统层细节
三、MapReduce 的框架原理 3.1 MRv1工作原理 3.1.1 MRv1架构工作原理图 3.1.1.1 流程说明 3.1.1.1.1 作业的提交 3.1.1.1.2 作业的初始化 3…
目录
一、什么是MapReduce
二、MapReduce 的设计思想 2.1 分而治之 2.2 构建抽象模型Map和Reduce 2.3 隐藏系统层细节
三、MapReduce 的框架原理 3.1 MRv1工作原理 3.1.1 MRv1架构工作原理图 3.1.1.1 流程说明 3.1.1.1.1 作业的提交 3.1.1.1.2 作业的初始化 3.1.1.1.3 任务的分配 3.1.1.1.4 任务的执行 3.1.1.1.5 进度和状态的更新 3.1.1.1.6 作业的完成 3.1.1.2 组件说明 3.1.1.2.1 Mapper和Reducer 3.1.1.2.2 JobTracker 3.1.1.2.3 TaskTracker 3.1.1.2.4 JobClient 3.1.1.2.5 MapTask和ReduceTask 3.1.1.2.5.1 MapTask工作机制 3.1.1.2.5.2 ReduceTask工作机制
3.1.2 MapReduce工作原理图逻辑实体角度 3.1.2.1 流程说明 3.1.2.1.1 split 阶段 3.1.2.1.2 map 阶段 3.1.2.1.3 Shuffle 阶段 3.1.2.1.4 Reduce 阶段 四、MapReduce 的基本使用案例 4.1 MapReduce编程规范 4.1.1 编写 Mapper 类 4.1.2 编写 Reducer 类 4.1.3 Driver 阶段 4.2 案例说明wordcount案例 4.2.1 split分割
4.2.2 执行Map方法
4.2.3 排序及Combine
4.2.4 执行Reduce方法 五、性能优化 5.1 Mapreduce 性能影响因素分析 5.1.1 计算机性能 5.1.2 I/O 操作优化 5.1.2.1 数据倾斜 5.1.2.2 map 和 reduce 数设置不合理 5.1.2.3 map 运行时间太长导致 reduce 等待过久 5.1.2.4 小文件过多 5.1.2.5 大量的不可分块的超大文件 5.1.2.6 spill 次数过多 5.1.2.7 merge 次数过多等 5.2 优化方法 5.2.1 数据输入 5.1.2 Map 阶段 5.1.3 Reduce 阶段 5.1.4 数据倾斜问题 3.1.5 常用的调优参数 一、什么是MapReduce
MapReduce是一个用于大规模数据处理的分布式计算模型最初由Google工程师设计并实现的Google已经将完整的MapReduce论文公开发布了。其中的定义是MapReduce是一个编程模型是一个用于处理和生成大规模数据集的相关的实现。用户定义一个map函数来处理一个Key-Value对以生成一批中间的Key-Value对再定义一个reduce函数将所有这些中间的有相同Key的Value合并起来。很多现实世界中的任务都可用这个模型来表达。
二、MapReduce 的设计思想 2.1 分而治之
简化并行计算的编程模型 2.2 构建抽象模型Map和Reduce
开发人员专注于实现Mapper和Reducer函数 2.3 隐藏系统层细节
开发人员专注于业务逻辑实现
三、MapReduce 的框架原理 3.1 MRv1工作原理 3.1.1 MRv1架构工作原理图 3.1.1.1 流程说明 3.1.1.1.1 作业的提交
JobClient的submitJob()方法实现的作业提交过程如下所示
1通过JobTracker的getNewJobId()方法向jobtracker请求一个新的作业ID。参见步骤2。
2检查作业的输出说明也就是说要指定输出目录的路径但是输出目录还不能存在(防止覆盖输出结果)如果不满足条件就会将错误抛给MapReduce程序。
3检查作业的输入说明也就是说如果输入路径不存在作业也没法提交如果不满足条件就会将错误抛给MapReduce程序。
4将作业运行所需的资源比如作业JAR文件、配置文件等复制到HDFS中。参见步骤3。
5通过JobTracker的submitJob()方法告诉jobtracker作业准备执行。参见步骤4。 3.1.1.1.2 作业的初始化
1JobTracker接收到对其submitJob()方法调用之后就会把此调用放入一个内部队列当中交由作业调度器进行调度。(说明Hadoop作业的调度器常见的有3个先进先出调度器容量调度器公平调度器。Hadoop作业调度器采用的是插件机制即作业调度器是动态加载的、可插拔的同时第三方可以开发自己的作业调度器。参见步骤5。
2初始化包括创建一个表示正在运行作业的对象——封装任务的记录信息以便跟踪任务的状态和进程。参见步骤5。
3接下来要创建运行任务列表作业调度器首先从共享文件系统中获取JobClient已计算好的输入分片信息然后为每个分片创建一个map任务(也就是说mapper的个数与分片的数目相同)。参见步骤6。(创建reduce任务的数量由JobConf的mapred.reduce.task属性决定它是用setNumReduceTasks()方法来设置的然后调度器创建相应数量的要运行的reduce任务默认情况只有一个reducer) 3.1.1.1.3 任务的分配
1tasktracker本身运行一个简单的循环来定期发送”心跳(heartbeat)”给jobtracker。什么是心跳呢就是tasktracker告诉jobtracker它是否还活着同时心跳也充当两者之间的消息通信比如tasktracker会指明它是否已经做好准备来运行新的任务了如果是管理者jobtracker就会给执行者tasktracker分配一个任务。参见步骤7。
2当然在管理者jobtracker为执行者tasktracker选择任务之前jobtracker必须先选定任务所在的作业。一旦选择好作业jobtracker就可以给tasktracker选定一个任务。如何选择一个作业呢当然是Hadoop作业的调度器了它就像是Hadoop的中枢神经系统一样默认的方法是简单维护一个作业优先级列表。(对于调度算法的更深理解可以学习操作系统的作业调度算法进程调度算法比如先来先服务(FCFS)调度算法短作业优先(SJF)调度算法优先级调度算法高响应比优先调度算法时间片轮转调度算法多级反馈队列调度算法等。如果从更高的角度来看调度算法其实是一种控制和决策的策略选择。) 3.1.1.1.4 任务的执行
1作业选择好了任务也选择好了接下来要做的事情就是任务的运行了。首先从HDFS中把作业的JAR文件复制到tasktracker所在的文件系统同时tasktracker将应用程序所需要的全部文件从分布式缓存复制到本地磁盘也就是从HDFS文件系统复制到ext4等文件系统之中。参见步骤8。
2tasktracker为任务新建一个本地工作目录并把JAR文件中的内容解压到这个文件夹中新建一个TaskRunner实例来运行该任务。
3TaskRunner启动一个新的JVM(参见步骤9)来运行每个任务(参见步骤10)以便用户定义的map和reduce函数的任何缺陷都不会影响TaskTracker守护进程(比如导致它崩溃或者挂起)。需要说明一点的是对于map和reduce任务tasktracker有固定数量的任务槽准确数量由tasktracker核的数量和内存大小来决定比如一个tasktracker可能同时运行两个map任务和reduce任务。map任务和reduce任务中关于数据本地化部分不再讲解因为DRCP没有用到只要理解本地数据级别就可以了比如node-localrack-localoff-switch。
4子进程通过umbilical接口与父进程进行通信任务的子进程每隔几秒便告诉父进程它的进度直到任务完成。 3.1.1.1.5 进度和状态的更新 1MapReduce是Hadoop的一个离线计算框架运行时间范围从数秒到数小时因此对于我们而言直到作业进展是很重要的。
2一个作业和每个任务都有一个状态信息包括作业或任务的运行状态(比如运行状态成功完成失败状态)、Map和Reduce的进度、计数器值、状态消息和描述(可以由用户代码来设置)等。
3这些消息通过一定的时间间隔由Child JVM—TaskTracker—JobTracker汇聚。JobTracker将产生一个表明所有运行作业及其任务状态的全局视图。可以通过Web UI查看。同时JobClient通过每秒查询JobTracker来获得最新状态输出到控制台上。
4现在可能会有一个疑问这些状态信息在作业执行期间不断变化它们是如何与客户端进行通信的呢详细细节不在讲解参考资料《Hadoop权威指南》。 3.1.1.1.6 作业的完成
1当jobtracker收到作业最后一个任务已完成的通知后便把作业的状态设置为”成功”。然后在JobClient查询状态时便知道作业已成功完成于是JobClient打印一条消息告知用户最后从runJob()方法返回。
说明
MapReduce容错即作业失败情况不再讲解参考资料《Hadoop权威指南》。 3.1.1.2 组件说明 3.1.1.2.1 Mapper和Reducer
运行在Hadoop上的MapReduce应用程序最基本的组成部分包括一是Mapper抽象类一是Reducer抽象类一是创建JobConf的执行程序。 3.1.1.2.2 JobTracker
JobTracker是一个master服务软件启动之后JobTracker接收Job负责调度Job的每一个子任务Task运行于TaskTracker上并且监控它们的运行如果发现有失败的Task就重新运行它一般情况下应该把JobTracker部署在单独的机器上。 3.1.1.2.3 TaskTracker
TaskTracker是运行在多个节点上的slaver服务。TaskTracker主动与JobTracker通信(与DataNode和NameNode相似通过心跳来实现)接收作业并负责直接执行每一个任务。 3.1.1.2.4 JobClient
每一个Job都会在用户端通过JobClient类将应用程序以及配置参数Configuration打包成JAR文件存储在HDFS中并把路径提交到JobTracker的master服务然后由master创建每一个Task(即MapTask和ReduceTask)将它们分发到各个TaskTracker服务中去执行。 3.1.1.2.5 MapTask和ReduceTask
一个完整的Job会自动依次执行Mapper、Combiner(在JobConf指定Combiner时执行)和Reducer其中Mapper和Combiner是由MapTask调用执行Reduce则由ReduceTask调用Combiner实际也是Reducer接口类的实现。Mapper会根据Job JAR中定义的输入数据集key1, value1对读入处理完成生成临时的key2, value2对如果定义了CombinerMapTask会在Mapper完成调用该Combiner将相同Key的值做合并处理以减少输出结果集。MapTask的任务全部完成后交给ReduceTask进程调用Reducer处理生成最终结果Key3, value3对。 3.1.1.2.5.1 MapTask工作机制
1. 并行度决定机制
1问题引出 maptask 的并行度决定 map 阶段的任务处理并发度进而影响到整个 job的处理速度。 那么mapTask 并行任务是否越多越好呢
2MapTask 并行度决定机制 一个 job 的 map 阶段 MapTask 并行度个数由客户端提交 job 时的切片个数决定。
2. MapTask工作机制
1Read 阶段Map Task 通过用户编写的 RecordReader按照 InputSplit 记录的位置信息读取数据从中解析出一个个 Key,Value。
2Map 阶段将解析出的 key/value 交给用户编写 map()函数处理并产生一系列新的 key/value。
3Collect 收集阶段在用户编写 map()函数中当数据处理完成后一般会调用 OutputCollector.collect()输出结果。在该函数内部它会将生成的 key/value 分区调用 Partitioner并写入一个环形内存缓冲区中。
4Spill 阶段即 **溢写** 当环形缓冲区满后MapReduce 会将数据写到本地磁盘上生成一个临时文件。需要注意的是将数据写入本地磁盘之前先要对数据进行一次本地排序并在必要时对数据进行合并、压缩等操作。 溢写阶段详情 步骤 1利用快速排序算法对缓存区内的数据进行排序排序方式是先按照分区编号 partition 进行排序然后按照 key 进行排序。这样经过排序后数据以分区为单位聚集在一起且同一分区内所有数据按照 key 有序。 步骤 2按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件 output/spillN.outN 表示当前溢写次数中。如果用户设置了Combiner则写入文件之前对每个分区中的数据进行一次聚集操作。 步骤 3将分区数据的元信息写到内存索引数据结构 SpillRecord 中其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小 。 如 果 当 前 内 存 索 引 大 小超过 1MB 则将内存索引写到文件output/spillN.out.index 中。
5Combine 阶段当所有数据处理完成后MapTask 对所有临时文件进行一次合并以确保最终只会生成一个数据文件。 当所有数据处理完后MapTask 会将所有临时文件合并成一个大文件并保存到文件 output/file.out 中同时生成相应的索引文件 output/file.out.index。
在进行文件合并过程中MapTask 以分区为单位进行合并。对于某个分区它将采用多轮递归合并的方式。每轮合并 io.sort.factor默认 100个文件并将产生的文件重新加入待合并列表中对文件排序后重复以上过程直到最终得到一个大文件。
让每个 MapTask 最终只生成一个数据文件可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。 3.1.1.2.5.2 ReduceTask工作机制
1.设置 ReduceTask 并行度个数 reducetask 的并行度同样影响整个 job 的执行并发度和执行效率但与maptask 的并发数由切片数决定不同Reducetask 数量的决定是可以直接手动设置
//默认值是 1手动设置为 5
job.setNumReduceTasks(5);
2.注意 1reducetask0 表示没有 reduce 阶段输出文件个数和 map 个数一致。 2reducetask 默认值就是 1所以输出文件个数为一个。 3如果数据分布不均匀就有可能在 reduce 阶段产生数据倾斜 4reducetask 数量并不是任意设置还要考虑业务逻辑需求有些情况下需要计算全局汇总结果就只能有 1 个 reducetask。 5具体多少个 reducetask需要根据集群性能而定。 6如果分区数不是 1但是 reducetask 为1是否执行分区过程。答案是不执行分区过程。因为在maptask的源码中执行分区的前提是先判断reduceNum个数是否大于 1。不大于 1 肯定不执行。
3.ReduceTask 工作机制 1Copy 阶段ReduceTask 从各个 MapTask 上远程拷贝一片数据并针对某一片数据如果其大小超过一定阈值则写到磁盘上否则直接放到内存中。 2Merge 阶段在远程拷贝数据的同时ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并以防止内存使用过多或磁盘上文件过多。 3Sort阶段按照MapReduce语义用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起Hadoop采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序因此ReduceTask 只需对所有数据进行一次归并排序即可。 4Reduce 阶段reduce()函数将计算结果写到 HDFS 上。
3.1.2 MapReduce工作原理图逻辑实体角度 3.1.2.1 流程说明 3.1.2.1.1 split 阶段
首先 mapreduce 会根据要运行的大文件来进行 split每个输入分片(input split)针对一个 map 任务输入分片(InputSplit)存储的并非数据本身而是一个分片长度和一个记录数据位置的数组。输入分片(InputSplit)通常和 HDFS 的 block(块)关系很密切假如我们设定 HDFS 的块的大小是 128MB我们运行的大文件是128x10MBMapReduce 会分为 10 个 MapTask每个 MapTask 都尽可能运行在block(块)所在的 DataNode 上体现了移动计算不移动数据的思想。 3.1.2.1.2 map 阶段
map 阶段就是执行自己编写的 Mapper 类中的 map 函数Map 过程开始处理MapTask 会接受输入分片通过不断的调用 map()方法对数据进行处理。处理完毕后转换为新的 KEY,VALUE键值对输出。 3.1.2.1.3 Shuffle 阶段
shuffle 阶段主要负责将 map 端生成的数据传递给 reduce 端因此 shuffle 分为在 map 端的过程和在 reduce 端的执行过程。具体过程如下 1MapTask 收集 map()方法的输出KEY,VALUE对放到内存缓冲区称为环形缓冲区中其中环形缓冲区的大小默认是 100MB。 2环形缓冲区到达一定阈值环形缓冲区大小的 80%时会将缓冲区中的数据溢出本地磁盘文件这个过程中可能会溢出多个文件。 3多个溢出文件会被合并成大的溢出文件。 4在溢出过程及合并的过程中都要调用 Partitioner 进行分区和针对 key进行排序 sort。 5合并成大文件后Map 端 shuffle 的过程也就结束了后面进入 reduce端 shuffle 的过程。 6在 Reduce 端shuffle 主要分为复制 Map 输出(copy)、排序合并Merge Sort两个阶段。
Reduce 任务通过 HTTP 向各个 Map 任务拖取它所需要的数据。 Copy 过来的数据会先放入内存缓冲区中如果内存缓冲区中能放得下这次数据的话就直接把数据写到内存中即内存到内存 merge。Reduce 要向每个 Map去拖取数据在内存中每个 Map 对应一块数据当内存缓存区中存储的 Map 数据占用空间达到一定程度的时候开始启动内存中 merge把内存中的数据 merge输出到磁盘上一个文件中即内存到磁盘 merge。
当属于该 reducer 的 map 输出全部拷贝完成则会在 reducer 上生成多个文件如果拖取的所有 map 数据总量都没有超出内存缓冲区则数据就只存在于内存中这时开始执行合并操作即磁盘到磁盘 merge。 3.1.2.1.4 Reduce 阶段
Reduce 从合并的文件中取出一个一个的键值对 group调用用户自定义的 reduce()方法生成最终的输出文件。
注意: Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率原则上说缓冲区越大磁盘io的次数越少执行速度就越快。 缓冲区的大小可以通过参数调整参数io.sort.mb 默认 100M。 四、MapReduce 的基本使用案例 4.1 MapReduce编程规范
需要重点明确两点
1. 一个记录调用一次 map()方法。 2. 相同的 key 调用一次 reduce()方法。 4.1.1 编写 Mapper 类
1用户自定义的 Mapper 要继承框架提供的 Mapper 类。
2Mapper 的输入数据是 KV 键值对的形式KV 的类型可自定义。
3对数据的处理逻辑写在 Mapper 类中 map()方法中。
4Mapper 的输出数据是 KV 键值对的形式KV 的类型可自定义。
5map()方法maptask 进程每一个K,V数据执行一次。 4.1.2 编写 Reducer 类
1用户自定义的 Reducer 要继承框架提供的 Reducer 父类。
2Reducer 的输入数据类型对应 Mapper 的输出数据类型也是 KV。
3Reducer 的业务逻辑写在 reduce()方法中。
4每一组相同 k 的k,Iterator组调用一次 reduce()方法。 4.1.3 Driver 阶段
整个程序需要编写一个 Driver 来进行提交将自定义 Mapper 和 Reducer 类
组合成一个 job并提交 job 对象 4.2 案例说明wordcount案例 4.2.1 split分割 首先Map阶段框架会将用户输入分割成固定大小的片段随后将每个片段进一步分解成一批键值对作为map函数的输入
4.2.2 执行Map方法 4.2.3 排序及Combine 4.2.4 执行Reduce方法 五、性能优化 5.1 Mapreduce 性能影响因素分析 5.1.1 计算机性能
CPU、内存、磁盘健康、网络 5.1.2 I/O 操作优化 5.1.2.1 数据倾斜 5.1.2.2 map 和 reduce 数设置不合理 5.1.2.3 map 运行时间太长导致 reduce 等待过久 5.1.2.4 小文件过多 5.1.2.5 大量的不可分块的超大文件 5.1.2.6 spill 次数过多 5.1.2.7 merge 次数过多等 5.2 优化方法 5.2.1 数据输入
合并小文件在执行 mr 任务前将小文件进行合并大量的小文件会产生大量的 map 任务增大 map 任务装载次数而任务的装载比较耗时从而导致mr 运行较慢。 5.1.2 Map 阶段
1减少溢写spill次数通过调整 io.sort.mb 及 sort.spill.percent参数值增大触发 spill 的内存上限减少 spill 次数从而减少磁盘 IO。
2减少合并merge次数通过调整 io.sort.factor 参数增大 merge 的文件数目减少 merge 的次数从而缩短 mr 处理时间。
3在 map 之后不影响业务逻辑前提下先进行 combine 处理减少 I/O。 5.1.3 Reduce 阶段
1合理设置 map 和 reduce 数两个都不能设置太少也不能设置太多。太少会导致 task 等待延长处理时间太多会导致 map、reduce 任务间竞争资源造成处理超时等错误。
2设置 map、reduce 共存调整 slowstart.completedmaps 参数使 map 运行到一定程度后reduce 也开始运行减少 reduce 的等待时间。
3使用 reduce因为 reduce 在用于搜集数据集的时候将会产生大量的网络消耗。
4合理设置 reduce 端的 buffer默认情况下数据达到一个阈值的时候buffer中的数据就会写入磁盘然后 reduce 会从磁盘中获得所有的数据。也就是说buffer 和reduce 是没有直接关联的中间多个一个写磁盘-读磁盘的过程既然有这个弊端那么就可以通过参数来配置使得 buffer 中的一部分数据可以直接输送到reduce从而减少IO开销mapred.job.reduce.input.buffer.percent默认为0.0。当值大于0的时候会保留指定比例的内存读buffer中的数据直接拿给reduce使用。这样一来设置 buffer 需要内存读取数据需要内存reduce 计算也要内存所以要根据作业的运行情况进行调整。 5.1.4 数据倾斜问题
1数据倾斜现象 数据频率倾斜——某一个区域的数据量要远远大于其他区域。 数据大小倾斜——部分记录的大小远远大于平均值。
2如何收集倾斜数据 在 reduce 方法中加入记录 map 输出键的详细情况的功能。
3减少数据倾斜的方法 方法 1抽样和范围分区 可以通过对原始数据进行抽样得到的结果集来预设分区边界值。 方法 2自定义分区 基于输出键的背景知识进行自定义分区。例如如果 map 输出键的单词来源于一本书。且其中某几个专业词汇较多。那么就可以自定义分区将这这些专业词汇发送给固定的一部分 reduce 实例。而将其他的都发送给剩余的 reduce 实例。 方法 3Combine 使用 Combine 可以大量地减小数据倾斜。在可能的情况下combine 的目的就是提前聚合并精简数据。 方法 4采用 Map Join尽量避免 Reduce Join。 3.1.5 常用的调优参数
1资源相关参数 1 以下参数是在用户自己的 mr 应用程序中配置就可以生效mapred-default.xml。 2 应 该 在 yarn 启 动 之 前 就 配 置 在 服 务 器 的 配 置 文 件 中 才 能 生 效yarn-default.xml。 3 shuffle 性 能 优化 的 关 键 参 数 应在 yarn 启动 之 前 就配 置 好mapred-default.xml。 2容错相关参数(mapreduce 性能优化) 今天MapReduce的相关内容就分享到这里如果帮助到大家欢约大家点赞关注收藏有疑问也欢迎大家评论留言