扁平化网站首页,aspx做网站,余姚市建设局行政服务中心网站,科技网站设计欣赏http://blog.csdn.net/aijiudu/article/details/72353510 废话不说直接来一张图如下#xff1a; 从JVM的角度看Map和Reduce Map阶段包括#xff1a; 第一读数据#xff1a;从HDFS读取数据 1、问题:读取数据产生多少个Mapper#xff1f;#xff1f; Mapper数据过大的话 从JVM的角度看Map和Reduce Map阶段包括 第一读数据从HDFS读取数据 1、问题:读取数据产生多少个Mapper Mapper数据过大的话会产生大量的小文件由于Mapper是基于虚拟机的过多的Mapper创建和初始化及关闭虚拟机都会消耗大量的硬件资源 Mapper数太小并发度过小Job执行时间过长无法充分利用分布式硬件资源 2、Mapper数量由什么决定 1输入文件数目 2输入文件的大小 3配置参数 这三个因素决定的。 涉及参数 mapreduce.input.fileinputformat.split.minsize //启动map最小的split size大小默认0 mapreduce.input.fileinputformat.split.maxsize //启动map最大的split size大小默认256M dfs.block.size//block块大小默认64M 计算公式splitSize Math.max(minSize, Math.min(maxSize, blockSize)); 例如默认情况下例如一个文件800MBlock大小是128M那么Mapper数目就是7个。6个Mapper处理的数据是128M1个Mapper处理的数据是32M 再例如一个目录下有三个文件大小分别为5M10M 150M 这个时候其实会产生四个Mapper处理的数据分别是5M10M128M22M。 Mapper是基于文件自动产生的如果想要自己控制Mapper的个数 就如上面5M10M的数据很快处理完了128M要很长时间这个就需要通过参数的控制来调节Mapper的个数。 减少Mapper的个数的话就要合并小文件这种小文件有可能是直接来自于数据源的小文件也可能是Reduce产生的小文件。 设置合并器set都是在hive脚本也可以配置Hadoop 设置合并器本身 set hive.input.formatorg.apache.hadoop.hive.ql.io.CombineHiveInputFormat; set hive.merge.mapFilestrue; set hive.merge.mapredFilestrue; set hive.merge.size.per.task256000000;//每个Mapper要处理的数据就把上面的5M10M……合并成为一个 一般还要配合一个参数 set mapred.max.split.size256000000 // mapred切分的大小 set mapred.min.split.size.per.node128000000//低于128M就算小文件数据在一个节点会合并在多个不同的节点会把数据抓过来进行合并。 Hadoop中的参数可以通过控制文件的数量控制mapper数量 mapreduce.input.fileinputformat.split.minsizedefault0小于这个值会合并 mapreduce.input.fileinputformat.split.maxsize 大于这个值会切分 第二处理数据 Partition说明 对于map输出的每一个键值对系统都会给定一个partitionpartition值默认是通过计算key的hash值后对Reduce task的数量取模获得。如果一个键值对的partition值为1意味着这个键值对会交给第一个Reducer处理。 自定义partitioner的情况 1、我们知道每一个Reduce的输出都是有序的但是将所有Reduce的输出合并到一起却并非是全局有序的如果要做到全局有序我们该怎么做呢最简单的方式只设置一个Reduce task但是这样完全发挥不出集群的优势而且能应对的数据量也很受限。最佳的方式是自己定义一个Partitioner用输入数据的最大值除以系统Reduce task数量的商作为分割边界也就是说分割数据的边界为此商的1倍、2倍至numPartitions-1倍这样就能保证执行partition后的数据是整体有序的。 2、解决数据倾斜另一种需要我们自己定义一个Partitioner的情况是各个Reduce task处理的键值对数量极不平衡。对于某些数据集由于很多不同的key的hash值都一样导致这些键值对都被分给同一个Reducer处理而其他的Reducer处理的键值对很少从而拖延整个任务的进度。当然编写自己的Partitioner必须要保证具有相同key值的键值对分发到同一个Reducer。 3、自定义的Key包含了好几个字段比如自定义key是一个对象包括type1type2type3,只需要根据type1去分发数据其他字段用作二次排序。 环形缓冲区 Map的输出结果是由collector处理的每个Map任务不断地将键值对输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间在内存中放置尽可能多的数据。 这个数据结构其实就是个字节数组叫Kvbuffer名如其义但是这里面不光放置了数据还放置了一些索引数据给放置索引数据的区域起了一个Kvmeta的别名在Kvbuffer的一块区域上穿了一个IntBuffer字节序采用的是平台自身的字节序的马甲。数据区域和索引数据区域在Kvbuffer中是相邻不重叠的两个区域用一个分界点来划分两者分界点不是亘古不变的而是每次Spill之后都会更新一次。初始的分界点是0数据的存储方向是向上增长索引数据的存储方向是向下增长Kvbuffer的存放指针bufindex是一直闷着头地向上增长比如bufindex初始值为0一个Int型的key写完之后bufindex增长为4一个Int型的value写完之后bufindex增长为8。 索引是对在kvbuffer中的键值对的索引是个四元组包括value的起始位置、key的起始位置、partition值、value的长度占用四个Int长度Kvmeta的存放指针Kvindex每次都是向下跳四个“格子”然后再向上一个格子一个格子地填充四元组的数据。比如Kvindex初始位置是-4当第一个键值对写完之后(Kvindex0)的位置存放value的起始位置、(Kvindex1)的位置存放key的起始位置、(Kvindex2)的位置存放partition的值、(Kvindex3)的位置存放value的长度然后Kvindex跳到-8位置等第二个键值对和索引写完之后Kvindex跳到-12位置。 第三写数据到磁盘 Mapper中的Kvbuffer的大小默认100M可以通过mapreduce.task.io.sort.mbdefault100参数来调整。可以根据不同的硬件尤其是内存的大小来调整调大的话会减少磁盘spill的次数此时如果内存足够的话一般都会显著提升性能。spill一般会在Buffer空间大小的80%开始进行spill因为spill的时候还有可能别的线程在往里写数据因为还预留空间有可能有正在写到Buffer中的数据可以通过mapreduce.map.sort.spill.percentdefault0.80进行调整Map Task在计算的时候会不断产生很多spill文件在Map Task结束前会对这些spill文件进行合并这个过程就是merge的过程。mapreduce.task.io.sort.factordefault10代表进行merge的时候最多能同时merge多少spill如果有100个spill个文件此时就无法一次完成整个merge的过程这个时候需要调大mapreduce.task.io.sort.factordefault10来减少merge的次数从而减少磁盘的操作 Spill这个重要的过程是由Spill线程承担Spill线程从Map任务接到“命令”之后就开始正式干活干的活叫SortAndSpill原来不仅仅是Spill在Spill之前还有个颇具争议性的Sort。 Combiner存在的时候此时会根据Combiner定义的函数对map的结果进行合并什么时候进行Combiner操作呢和Map在一个JVM中是由min.num.spill.for.combine的参数决定的默认是3也就是说spill的文件数在默认情况下由三个的时候就要进行combine操作最终减少磁盘数据 减少磁盘IO和网络IO还可以进行压缩对spillmerge文件都可以进行压缩。中间结果非常的大IO成为瓶颈的时候压缩就非常有用可以通过mapreduce.map.output.compressdefaultfalse设置为true进行压缩数据会被压缩写入磁盘读数据读的是压缩数据需要解压在实际经验中Hive在Hadoop的运行的瓶颈一般都是IO而不是CPU压缩一般可以10倍的减少IO操作压缩的方式GzipLzo,BZip2,Lzma等其中Lzo是一种比较平衡选择mapreduce.map.output.compress.codecdefaultorg.apache.hadoop.io.compress.DefaultCodec参数设置。但这个过程会消耗CPU适合IO瓶颈比较大。 Shuffle和Reduce阶段包括 一、Copy 1、由于job的每一个map都会根据reduce(n)数将数据分成map 输出结果分成n个partition所以map的中间结果中是有可能包含每一个reduce需要处理的部分数据的。所以为了优化reduce的执行时间hadoop中是等job的第一个map结束后所有的reduce就开始尝试从完成的map中下载该reduce对应的partition部分数据因此map和reduce是交叉进行的其实就是shuffle。Reduce任务通过HTTP向各个Map任务拖取下载它所需要的数据网络传输Reducer是如何知道要去哪些机器取数据呢一旦map任务完成之后就会通过常规心跳通知应用程序的Application Master。reduce的一个线程会周期性地向master询问直到提取完所有数据如何知道提取完数据被reduce提走之后map机器不会立刻删除数据这是为了预防reduce任务失败需要重做。因此map输出数据是在整个作业完成之后才被删除掉的。 2、reduce进程启动数据copy线程(Fetcher)通过HTTP方式请求maptask所在的TaskTracker获取maptask的输出文件。由于map通常有许多个所以对一个reduce来说下载也可以是并行的从多个map下载那到底同时到多少个Mapper下载数据这个并行度是可以通过mapreduce.reduce.shuffle.parallelcopies(default5调整。默认情况下每个Reducer只会有5个map端并行的下载线程在从map下数据如果一个时间段内job完成的map有100个或者更多那么reduce也最多只能同时下载5个map的数据所以这个参数比较适合map很多并且完成的比较快的job的情况下调大有利于reduce更快的获取属于自己部分的数据。 在Reducer内存和网络都比较好的情况下可以调大该参数 3、reduce的每一个下载线程在下载某个map数据的时候有可能因为那个map中间结果所在机器发生错误或者中间结果的文件丢失或者网络瞬断等等情况这样reduce的下载就有可能失败所以reduce的下载线程并不会无休止的等待下去当一定时间后下载仍然失败那么下载线程就会放弃这次下载并在随后尝试从另外的地方下载因为这段时间map可能重跑。reduce下载线程的这个最大的下载时间段是可以通过mapreduce.reduce.shuffle.read.timeoutdefault180000秒调整的。如果集群环境的网络本身是瓶颈那么用户可以通过调大这个参数来避免reduce下载线程被误判为失败的情况。一般情况下都会调大这个参数这是企业级最佳实战。 二、MergeSort 1、这里的merge和map端的merge动作类似只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中然后当使用内存达到一定量的时候才spill磁盘。这里的缓冲区大小要比map端的更为灵活它基于JVM的heap size设置。这个内存大小的控制就不像map一样可以通过io.sort.mb来设定了而是通过另外一个参数 mapreduce.reduce.shuffle.input.buffer.percentdefault 0.7f 源码里面写死了 来设置这个参数其实是一个百分比意思是说shuffile在reduce内存中的数据最多使用内存量为0.7 × maxHeap of reduce task。JVM的heapsize的70%。内存到磁盘merge的启动门限可以通过mapreduce.reduce.shuffle.merge.percentdefault0.66配置。也就是说如果该reduce task的最大heap使用量通常通过mapreduce.admin.reduce.child.java.opts来设置比如设置为-Xmx1024m的一定比例用来缓存数据。默认情况下reduce会使用其heapsize的70%来在内存中缓存数据。假设 mapreduce.reduce.shuffle.input.buffer.percent 为0.7reducetask的max heapsize为1G那么用来做下载数据缓存的内存就为大概700MB左右。这700M的内存跟map端一样也不是要等到全部写满才会往磁盘刷的而是当这700M中被使用到了一定的限度通常是一个百分比就会开始往磁盘刷刷磁盘前会先做sortMerge。这个限度阈值也是可以通过参数 mapreduce.reduce.shuffle.merge.percentdefault0.66来设定。与map 端类似这也是溢写的过程这个过程中如果你设置有Combiner也是会启用的然后在磁盘中生成了众多的溢写文件。这种merge方式一直在运行直到没有map端的数据时才结束然后启动磁盘到磁盘的merge方式生成最终的那个文件。 这里需要强调的是merge有三种形式1)内存到内存memToMemMerger2内存中MergeinMemoryMerger3磁盘上的MergeonDiskMerger具体包括两个一Copy过程中磁盘合并二磁盘到磁盘。 1内存到内存MergememToMemMerger Hadoop定义了一种MemToMem合并这种合并将内存中的map输出合并然后再写入内存。这种合并默认关闭可以通过mapreduce.reduce.merge.memtomem.enabled(default:false) 打开当map输出文件达到mapreduce.reduce.merge.memtomem.threshold时触发这种合并。 2内存中MergeinMemoryMerger当缓冲中数据达到配置的阈值时这些数据在内存中被合并、写入机器磁盘。阈值有2种配置方式 配置内存比例前面提到reduceJVM堆内存的一部分用于存放来自map任务的输入在这基础之上配置一个开始合并数据的比例。假设用于存放map输出的内存为500Mmapreduce.reduce.shuffle.merge.percent配置为0.66则当内存中的数据达到330M的时候会触发合并写入。 配置map输出数量 通过mapreduce.reduce.merge.inmem.threshold配置。在合并的过程中会对被合并的文件做全局的排序。如果作业配置了Combiner则会运行combine函数减少写入磁盘的数据量。 3磁盘上的MergeonDiskMerger 3.1Copy过程中磁盘Merge在copy过来的数据不断写入磁盘的过程中一个后台线程会把这些文件合并为更大的、有序的文件。如果map的输出结果进行了压缩则在合并过程中需要在内存中解压后才能给进行合并。这里的合并只是为了减少最终合并的工作量也就是在map输出还在拷贝时就开始进行一部分合并工作。合并的过程一样会进行全局排序。 3.2最终磁盘中Merge当所有map输出都拷贝完毕之后所有数据被最后合并成一个整体有序的文件作为reduce任务的输入。这个合并过程是一轮一轮进行的最后一轮的合并结果直接推送给reduce作为输入节省了磁盘操作的一个来回。最后所以map输出都拷贝到reduce之后进行合并的map输出可能来自合并后写入磁盘的文件也可能来及内存缓冲在最后写入内存的map输出可能没有达到阈值触发合并所以还留在内存中。 每一轮合并不一定合并平均数量的文件数指导原则是使用整个合并过程中写入磁盘的数据量最小为了达到这个目的则需要最终的一轮合并中合并尽可能多的数据因为最后一轮的数据直接作为reduce的输入无需写入磁盘再读出。因此我们让最终的一轮合并的文件数达到最大即合并因子的值通过mapreduce.task.io.sort.factordefault10来配置。 如上图Reduce阶段中一个Reduce过程 可能的合并方式为假设现在有20个map输出文件合并因子配置为5则需要4轮的合并。最终的一轮确保合并5个文件其中包括2个来自前2轮的合并结果因此原始的20个中再留出3个给最终一轮。 三、Reduce函数调用用户自定义业务逻辑 1、当reduce将所有的map上对应自己partition的数据下载完成后就会开始真正的reduce计算阶段。reducetask真正进入reduce函数的计算阶段由于reduce计算时肯定也是需要消耗内存的而在读取reduce需要的数据时同样是需要内存作为buffer这个参数是控制reducer需要多少的内存百分比来作为reduce读已经sort好的数据的buffer大小默认用多大内存呢默认情况下为0也就是说默认情况下reduce是全部从磁盘开始读处理数据。可以用mapreduce.reduce.input.buffer.percentdefault 0.0(源代码MergeManagerImpl.java674行)来设置reduce的缓存。如果这个参数大于0那么就会有一定量的数据被缓存在内存并输送给reduce当reduce计算逻辑消耗内存很小时可以分一部分内存用来缓存数据可以提升计算的速度。所以默认情况下都是从磁盘读取数据如果内存足够大的话务必设置该参数让reduce直接从缓存读数据这样做就有点Spark Cache的感觉 2、Reduce在这个阶段框架为已分组的输入数据中的每个 key, (list of values)对调用一次 reduce(WritableComparable,Iterator, OutputCollector, Reporter)方法。Reduce任务的输出通常是通过调用 OutputCollector.collect(WritableComparable,Writable)写入文件系统的。Reducer的输出是没有排序的。 性能调优 如果能够根据情况对shuffle过程进行调优对于提供MapReduce性能很有帮助。相关的参数配置列在后面的表格中。 一个通用的原则是给shuffle过程分配尽可能大的内存当然你需要确保map和reduce有足够的内存来运行业务逻辑。因此在实现Mapper和Reducer时应该尽量减少内存的使用例如避免在Map中不断地叠加。 运行map和reduce任务的JVM内存通过mapred.child.java.opts属性来设置尽可能设大内存。容器的内存大小通过mapreduce.map.memory.mb和mapreduce.reduce.memory.mb来设置默认都是1024M。 map优化 在map端避免写入多个spill文件可能达到最好的性能一个spill文件是最好的。通过估计map的输出大小设置合理的mapreduce.task.io.sort.*属性使得spill文件数量最小。例如尽可能调大mapreduce.task.io.sort.mb。 map端相关的属性如下表 reduce优化 在reduce端如果能够让所有数据都保存在内存中可以达到最佳的性能。通常情况下内存都保留给reduce函数但是如果reduce函数对内存需求不是很高将mapreduce.reduce.merge.inmem.threshold触发合并的map输出文件数设为0mapreduce.reduce.input.buffer.percent用于保存map输出文件的堆内存比例设为1.0可以达到很好的性能提升。在2008年的TB级别数据排序性能测试中Hadoop就是通过将reduce的中间数据都保存在内存中胜利的。 reduce端相关属性 通用优化 Hadoop默认使用4KB作为缓冲这个算是很小的可以通过io.file.buffer.size来调高缓冲池大小。 转载于:https://www.cnblogs.com/felixzh/p/8604188.html