企业网站的搭建流程,代理公司招标流程,上海学网站建设,杭州seo外包文章目录MapReduce简介MapTaskReduceTaskMapper阶段解读Reducer阶段解读MapReduce适用的问题MapReduce的特点MapReduce基本思想大数据处理思想#xff1a;分而治之构建抽象模型#xff1a;Map 函数和 Reduce 函数上升到架构#xff1a;并行自动化并隐藏底层细节MapReduce计算…
文章目录MapReduce简介MapTaskReduceTaskMapper阶段解读Reducer阶段解读MapReduce适用的问题MapReduce的特点MapReduce基本思想大数据处理思想分而治之构建抽象模型Map 函数和 Reduce 函数上升到架构并行自动化并隐藏底层细节MapReduce计算架构提供的主要功能MapReduce框架中的名词解释MapReduce与YARNMapReduce的原理MapReduce进程常用数据序列化类型MapReduce实际处理流程FileInputFormat切片机制Mapreduce的shuffle机制MapReduce案例wordcountMapReduce简介
MapReduce是一种可用于数据处理的编程框架。MapReduce采用分而治之的思想把对大规模数据集的操作分发给一个主节点管理下的各个分节点共同完成然后通过整合各个节点的中间结果得到最终结果。简单地说MapReduce就是任务的分解与结果的汇总。
在分布式计算中MapReduce框架负责处理了并行编程中分布式存储、工作调度、负载均衡、容错均衡、容错处理以及网络通信等复杂问题把处理过程高度抽象为两个函数map和reducemap负责把任务分解成多个任务reduce负责把分解后多任务处理的结果汇总起来。 Map/Reduce是一个用于大规模数据处理的分布式计算编程模型。 MapReduce程序的工作分两个阶段进行 Map阶段映射 这个函数单独地应用在每个单元格上的操作就属于映射Map)。 由一个或者多个MapTask组成。每个MapTask处理输入数据集合中的一片数据(InputSplit)并将产生的若干个数据片段(一个数据文件)写到本地磁盘上。 Reduce阶段 由一个或者多个ReduceTask组成。ReduceTask则从每个MapTask上远程拷贝相应的数据片段经分组聚集和归约后将结果写到HDFS上作为最终结果。 使用需要定义map函数和reduce函数 map函数用来处理原始数据初始键值对以生成一批中间的key/value对 reduce函数将 所有这些中间的有着相同key的values合并起来。 输入到每一个阶段均是键 - 值对。
MapTask
执行过程概述 首先通过用户提供的InputFormat将对应的InputSplit解析成一系列key/value,并依次交给用户编写的map()函数处理接着按照指定的Partition对数据分片以确定每个key/value将交给哪个ReduceTask处理之后将数据交给用户定义的Combiner进行一次本地合并(没有则直接跳过)最后即将处理结果保存到本地磁盘上。 具体步骤 1Read阶段MapTask通过用户编写的RecordReader从输入InputSplit中解析出一个个key/value。 2Map阶段该阶段只要是将解析出的key/value交给用户编写的map()函数处理并产生一系列新的key/value。 3Collect阶段在用户编写的map()函数中当数据处理完成后一般会调用OutputCollector.collect()输出结果。在该函数内部它会将生成的ley/value分片(通过调用Partition),并写入一个环形内存缓冲区中。 4Spill阶段即“溢写”当环形缓冲区满后MapReduce会将数据写到本地磁盘上生成一个临时文件。需要注意的是将数据写入本地磁盘之前先要对数据进行一次本地排序并在必要时对数据进行合并操作。 5Combine阶段当所有数据处理完成后MapTask对所有临时文件进行一次合并以确保最终只会生成一个数据文件。
ReduceTask
执行过程概述 ReduceTask的输入数据来自各个MapTask因此首先通过HTTP请求从各个已经运行完成的MapTask所在TaskTracker机器上拷贝相应的数据分片待所有数据拷贝完成后再以key为关键字对所有数据进行排序(sort)通过排序key相同的记录聚集到一起形成若干分组然后将分组数据交给用户编写的reduce()函数处理并将数据结果直接写到HDFS上作为最终输出结果。 具体步骤 1Shuffle阶段也称为Copy阶段。ReduceTask从各个MapTask所在的TaskTracker上远程拷贝一片数据并针对某一片数据如果其大小超过一定阈值则写到磁盘上否则直接放到内存中。 2Merge阶段在远程拷贝数据的同时ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并以防止内存使用过多或磁盘上的文件过多并且可以为后面整体的归并排序减负提升排序效率。 3Sort阶段按照MapReduce的语义用户编写的reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚集在一起Hadoop采用了基于排序的策略。由于各个MapTask已经实现了自己的处理结果进行了局部排序因此ReduceTask只需要对所有数据进行一次归并排序即可。 4Reduce阶段在该阶段中ReduceTask将每组数据依次交给用户编写的reduce()函数处理。 5Write阶段reduce()函数将计算结果写到HDFS上。 Mapper阶段解读
Mapper的输入文件位于HDFS上先对输入数据切分每一个split分块对应一个Mapper任务通过RecordReader对象从输入分块中读取并生成键值对然后执行Map函数输出的中间键值对被partion()函数区分并写入缓冲区同时调用sort()进行排序。
Reducer阶段解读
Reducer主要有三个阶段Shuffle、Sort、Reduce
1 . Shuffle阶段
Reducer的输入就是Mapper阶段已经排好序的输出。在这个阶段框架为每个Reducer任务获得所有Mapper输出中与之相关的分块把Map端的输出结果传送到Reduce端大量操作是数据复制因此也称数据复制阶段。
2 . Sort阶段
框架按照key对Reducer的输入进行分组Mapper阶段时每一个Map任务对于它本身的输出结果会有一个排序分组而不同Map任务的输出中可能会有相同的key因此要再一次分组。Shuffle和Sort是同时进行的Map的输出也是一边被取回一边被合并。排序是基于内存和磁盘的混合模式进行经过多次Merge才能完成排序。PS如果两次排序分组规则需要不同可以指定一个Comparator比较器来控制分组规则。
3 . Reduce阶段
通过Shuffle和Sort操作后得到的key, (list of values)被送到Reducer的reduce()函数中执行针对每一个key, (list of values)会调用一次reduce()函数。
MapReduce适用的问题
用MapReduce来处理的数据集或任务必须具备这样的特点待处理的数据集可以分解成许多小的数据集而且每一个小数据集都可以完全并行地进行处理。
MapReduce的特点
1)MapReduce 易于编程 。它简单的实现一些接口就可以完成一个分布式程序
2)良好的 扩展性 。当你的计算资源不能得到满足的时候你可以通过简单的增加机器来扩展它的计算能力。
3)高容错性 。比如其中一台机器挂了它可以把上面的计算任务转移到另外一个节点上面上运行不至于这个任务运行失败而且这个过程不需要人工参与而完全是由Hadoop 内部完成的。
4)适合 PB 级以上海量数据的离线处理 。比如像毫秒级别的返回一个结果MapReduce 很难做到。MapReduce 虽然具有很多的优势但是它也有不擅长的地方。这里的不擅长不代表它不能做而是在有些场景下实现的效果差并不适合 MapReduce 来处理主要表现在以下几个方面。
1.实时计算。 2.流式计算。流式计算的输入数据时动态的而 MapReduce 的输入数据集是静态的不能动态变化。这是因为 MapReduce 自身的设计特点决定了数据源必须是静态的。
3.DAG有向图计算。多个应用程序存在依赖关系后一个应用程序的输入为前一个的输出。在这种情况下MapReduce 并不是不能做而是使用后每个MapReduce 作业的输出结果都会写入到磁盘会造成大量的磁盘IO导致性能非常的低下。
MapReduce基本思想
大数据处理思想分而治之
并行计算的第一个重要问题是如何划分计算任务或者计算数据以便对划分的子任务或数据块同时进行计算。但是一些计算问题的前后数据项之间存在很强的依赖关系无法进行划分只能串行计算。
对于不可拆分的计算任务或相互间有依赖关系的数据无法进行并行计算。一个大数据若可以分为具有同样计算过程的数据块并且这些数据块之间不存在数据依赖关系则提高处理速度的最好办法就是并行计算。
构建抽象模型Map 函数和 Reduce 函数
Map 函数和 Reduce 函数都是以 keyvalue作为输入的按一定的映射规则转换成另一个或一批 keyvalue 进行输出。
1 Map:k1,v1List(K2,V2)
输入键值对k1,v1表示的数据。 处理数据记录将以“键值对”形式传入 Map 函数Map 函数将处理这些键值对并以另一种键值对形式输出中间结果 List(K2,V2)。 输出键值对List(K2,V2)示的一组中间数据。
2 Reduce:K2,List(V2)→List(K3,V3)
输入由 Map 输出的一组键值对 List(K2,V2)将被进行合并处理同样主键下的不同数值会合并到一个列表List(V2)中故 Reduce 的输入为K2,List(V2)。
处理对传入的中间结果列表数据进行某种整理或进一步的处理并产生最终的输出结果List(K3,V3)。 输出最终输出结果List(K3,V3)。
基于 MapReduce 的并行计算模型如图 3 所示。各个 Map 函数对所划分的数据并行处理从不同的输入数据产生不同的中间结果。
各个 Reduce 函数也各自并行计算负责处理不同的中间结果。进行 Reduce 函数处理之前必须等到所有的 Map 函数完成。
因此在进入 Reduce 函数前需要有一个同步屏障这个阶段也负责对 Map 函数的中间结果数据进行收集整理处理以便 Reduce 函数能更有效地计算最终结果最终汇总所有 Reduce 函数的输出结果即可获得最终结果。 基于MapReduce的并行计算模型Map 函数的输入数据来自于 HDFS的文件块这些文件块的格式是任意类型的可以是文档可以是数字也可以是二进制。文件块是一系列元素组成的集合这些元素也可以是任意类型的。 Map 函数首先将输入的数据块转换成 key,Value 形式的键值对键和值的类型也是任意的。 Map 函数的作用就是把每一个输入的键值对映射成一个或一批新的键值对。输出键值对里的键与输入键值对里的键可以是不同的。 需要注意的是Map 函数的输出格式与 Reduce 函数的输入格式并不相同前者是 List(K2,V2) 格式后者是K2List(V2) 的格式。所以Map 函数的输出并不能直接作为 Reduce 函数的输入。 MapReduce 框架会把 Map 函数的输出按照键进行归类把具有相同键的键值对进行合并合并成 K2,List(V2) 的格式其中List(V2) 是一批属于同一个 K2 的 value。 Reduce 函数的任务是将输入的一系列具有相同键的值以某种方式组合起来然后输出处理后的键值对输出结果一般会合并成一个文件。 为了提高 Reduce 的处理效率用户也可以指定 Reduce 任务的个数也就是说可以有多个 Reduce 并发来完成规约操作。 MapReduce 框架会根据设定的规则把每个键值对输入到相应的 Reduce 任务进行处理。这种情况下MapReduce将会输出多个文件。 一般情况下并不需要把这些输出文件进行合并因为这些文件也许会作为下一个 MapRedue 任务的输入。 上升到架构并行自动化并隐藏底层细节
MapReduce 提供了一个统一的计算框架来完成计算任务的划分和调度数据的分布存储和划分处理数据与计算任务的同步结果数据的收集整理系统通信、负载平衡、计算性能优化、系统结点出错检测和失效恢复处理等。
MapReduce 通过抽象模型和计算框架把需要做什么与具体怎么做分开了为程序员提供了一个抽象和高层的编程接口和框架程序员仅需要关心其应用层的具体计算问题仅需编写少量的处理应用本身计算问题的程序代码。
与具体完成并行计算任务相关的诸多系统层细节被隐藏起来交给计算框架去处理从分布代码的执行到大到数千个小到单个的结点集群的自动调度使用。
MapReduce计算架构提供的主要功能
1任务调度
提交的一个计算作业Job)将被划分为很多个计算任务Tasks)。
任务调度功能主要负责为这些划分后的计算任务分配和调度计算结点Map 结点或 Reduce 结点同时负责监控这些结点的执行状态以及 Map 结点执行的同步控制也负责进行一些计算性能优化处理。例如对最慢的计算任务采用多备份执行选最快完成者作为结果。
2数据/程序互定位
为了减少数据通信量一个基本原则是本地化数据处理即一个计算结点尽可能处理其本地磁盘上分布存储的数据这实现了代码向数据的迁移。
当无法进行这种本地化数据处理时再寻找其他可用结点并将数据从网络上传送给该结点数据向代码迁移)但将尽可能从数据所在的本地机架上寻找可用结点以减少通信延迟。
3出错处理
在以低端商用服务器构成的大规模 MapReduce 计算集群中结点硬件主机、兹盘、内存等出错和软件有缺陷是常态。因此MapReduce 架构需要能检测并隔离出错结点并调度分配新的结点接管出错结点的计算任务。
4分布式数据存储与文件管理
海量数据处理需要一个良好的分布数据存储和文件管理系统作为支撑该系统能够把海量数据分布存储在各个结点的本地磁盘上但保持整个数据在逻辑上成为一个完整的数据文件。
为了提供数据存储容错机制该系统还要提供数据块的多备份存储管理能力。
5Combiner 和 Partitioner
为了减少数据通信开销中间结果数据进入 Reduce 结点前需要进行合并Combine处理即把具有同样主键的数据合并到一起避免重复传送。
一个 Reduce 结点所处理的数据可能会来自多个 Map 结点因此Map 结点输出的中间结果需使用一定的策略进行适当的划分Partition处理保证相关数据发送到同一个 Reduce 结点上。
MapReduce框架中的名词解释
split: 分片是指MapReduce框架将数据源根据一定的规则将源数据分成若干个小数据的过程其中一个小数据集也被称为一个分片。
Map: Map有两层含义
其一、是指MapReduce框架中的Map过程即将一个分片根据用户定义的Map逻辑处理后经由MapReduce框架处理形成输出结果供后续Reduce过程使用 其二是指用户定义Java程序实现Mapper类的map接口的用户自定义逻辑此时通常被称为mapper。 Reduce: Reduce也有两层含义
其一是指MapReduce框架中的Reduce过程即将Map的结果作为输入根据用户定义的Reduce逻辑将结果处理并汇总输出最后的结果 其二是指用户定义Java程序实现Reducer类的reduce接口的用户自定义逻辑此时通常被称为reducer。 Combine: Combine是一个可由用户自定的过程类似于Map和ReduceMapReduce框架会在Map和Reduce过程中间调用Combine逻辑会在下面章节中仔细讲解通常Combine和reduce的用户代码是一样的(也可被称为本地的reduce过程)但是请注意并不是所有用MapReduce框架实现的算法都适合增加Combine过程比如求平均值。
Partition: 在MapReduce框架中一个split对应一个map一个partiton对应一个reduce(无partition指定时由用户配置项指定默认为1个。 reduce的个数决定了输出文件的个数。比如在需求中数据是从对每个省汇总而成要求计算结果按照省来存放则需要根据源数据中的表明省的字段分区用户自定义partition类进行分区。
MapReduce与YARN
YARN概述
Yarn是一个资源调度平台负责为运算程序提供服务器运算资源相当于一个分布式的操作系统平台而mapreduce等运算程序则相当于运行于操作系统之上的应用程序。
YARN中的重要概念 1 yarn并不清楚用户提交的程序的运行机制 2 yarn只提供运算资源的调度用户程序向yarn申请资源yarn就负责分配资源 3 yarn中的主管角色叫ResourceManager 4 yarn中具体提供运算资源的角色叫NodeManager 5 这样一来yarn其实就与运行的用户程序完全解耦就意味着yarn上可以运行各种类型的分布式运算程序mapreduce只是其中的一种比如mapreduce、storm程序spark程序tez等等 。 6 所以spark、storm等运算框架都可以整合在yarn上运行只要他们各自的框架中有符合yarn规范的资源请求机制即可 7 Yarn就成为一个通用的资源调度平台从此企业中以前存在的各种运算集群都可以整合在一个物理集群上提高资源利用率方便数据共享
MapReduce的原理 map以(key, value)的形式输入数据并根据编写的map()处理数据,输出为(key, value)的形式,map的输出经过中间阶段(叫做shuffle)的处理,再以(keyvalue)的形式传入reduce()内进行处理,最后以(key, value)的形式输出最终结果。 一个MapReduce作业(Job)是客户端要执行的一个工作单元:它包括输入数据,MapReduce程序与配置信息.Hadoop将作业分成若干个任务来执行,它包括两类任务:map任务与reduce任务.这些任务分布在集群的不同节点上,由YARN负责调度.如果一个任务失败,它将在另一个不同的节点上重新调度运行.
Hadoop将MapReduce的输入数据划分成等长的小数据块,称为输入分片(input split)或简称分片.Hadoop为每个分片创建一个map任务,并由该任务来运行用户自己定义的map函数从而处理分片中的每条记录.
map任务与reduce任务之间存在一个shuffle,这是MapReduce中最为消耗时间的过程,因为它对数据进行了多次处理,其中包括排序,分区,溢写,combiner等过程.combiner就是一个map端的reduce,可以让数据更加紧凑,所以一般都指定为reduce()所在的类(注意,有些任务中不适用combiner).这一切的处理都是为了减少map任务与reduce任务之间的网络传输,毕竟集群中最为稀缺的资源就是网络带宽,应该想尽办法节省。
分布式的运算程序往往需要分成至少2个阶段
第一个阶段的MapTask并发实例完全并行运行互不相干。
第二个阶段的ReduceTask并发实例互不相干但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。
MapReduce编程模型只能包含 一个Map阶段 和 一个Reduce阶段如果用户的业务逻辑非常复杂那就只能 多个MapReduce程序串行运行。MapReduce进程
一个完整的MapReduce程序在分布式运行时有三类实例进程 1MrAppMaster负责整个程序的 过程调度 及 状态协调。 2MapTask负责 Map阶段的 整个数据处理流程。 3ReduceTask负责 Reduce阶段的 整个数据处理流程。
常用数据序列化类型 MapReduce实际处理流程
mapreduce 其实是分治算法的一种现所谓分治算法就是“就是分而治之 将大的问题分解为相同类型的子问题最好具有相同的规模对子问题进行求解然后合并成大问题的解。
mapreduce就是分治法的一种将输入进行分片然后交给不同的task进行处理然后合并成最终的解。 mapreduce实际的处理过程可以理解为Input-Map-Sort-Combine-Partition-Reduce-Output。
1Input阶段
数据以一定的格式传递给Mapper有TextInputFormatDBInputFormatSequenceFileFormat等可以使用在Job.setInputFormat可以设置也可以自定义分片函数。
2map阶段
对输入的(keyvalue)进行处理即map(k1,v1)-list(k2,v2),使用Job.setMapperClass进行设置。
3Sort阶段
对于Mapper的输出进行排序使用Job.setOutputKeyComparatorClass进行设置然后定义排序规则。
4Combine阶段
这个阶段对于Sort之后又相同key的结果进行合并使用Job.setCombinerClass进行设置也可以自定义Combine Class类。
5Partition阶段
将Mapper的中间结果按照key的范围划分为R份Reduce作业的个数默认使用HashPartionerkey.hashCode()Integer.MAX_VALUE%numPartitions也可以自定义划分的函数。
使用Job.setPartitionClass设置。
6Reduce阶段
对于Mapper阶段的结果进行进一步处理Job.setReducerClass进行设置自定义的Reduce类。
7Output阶段 Reducer输出数据的格式。
FileInputFormat切片机制
1FileInputFormat切片机制切片定义在InputFormat类中的getSplit()方法 2FileInputFormat中默认的切片机制 简单地按照文件的内容长度进行切片 切片大小默认等于block大小 切片时不考虑数据集整体而是逐个针对每一个文件单独切片 。比如待处理数据有两个文件 file1.txt 320M file2.txt 10M
经过FileInputFormat的切片机制运算后形成的切片信息如下
file1.txt.split1-- 0~128 file1.txt.split2-- 128~256 file1.txt.split3-- 256~320 file2.txt.split1-- 0~10M 3FileInputFormat中切片的大小的参数配置 通过分析源码在FileInputFormat中计算切片大小的逻辑Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由这几个值来运算决定 minsize默认值1 配置参数 mapreduce.input.fileinputformat.split.minsize maxsize默认值Long.MAXValue 配置参数mapreduce.input.fileinputformat.split.maxsize blocksize 因此默认情况下切片大小blocksize maxsize切片最大值 参数如果调得比blocksize小则会让切片变小而且就等于配置的这个参数的值 minsize 切片最小值 参数调的比blockSize大则可以让切片变得比blocksize还大
选择并发数的影响因素 运算节点的硬件配置 运算任务的类型CPU密集型还是IO密集型 运算任务的数据量
Mapreduce的shuffle机制
MapReduce计算模型主要由三个阶段构成Map、Shuffle、Reduce。 1Map是映射负责数据的过滤分类将原始数据转化为键值对 2Reduce是合并将具有相同key值的value进行处理后再输出新的键值对作为最终结果 3为了让Reduce可以并行处理Map的结果必须对Map的输出进行一定的排序与分割然后再交给对应的Reduce这个过程就是Shuffle。Shuffle过程包含Map Shuffle和Reduce Shuffle。 1概述
mapreduce中map阶段处理的数据如何传递给reduce阶段是mapreduce框架中最关键的一个流程这个流程就叫shuffle。 shuffle: 洗牌、发牌——核心机制数据分区排序缓存。 具体来说就是将maptask输出的处理结果数据分发给reducetask并在分发的过程中对数据按key进行了分区和排序。 分区partition确定哪个数据进入哪个reduce Sort根据key排序 Combiner进行局部value的合并
2详细流程 1、 maptask收集我们的map()方法输出的kv对放到内存缓冲区中 2、 从内存缓冲区不断溢出本地磁盘文件可能会溢出多个文件 3、 多个溢出文件会被合并成大的溢出文件 4、 在溢出过程中及合并的过程中都要调用partitoner进行分组和针对key进行排序 5、 reducetask根据自己的分区号去各个maptask机器上取相应的结果分区数据 6、 reducetask会取到同一个分区的来自不同maptask的结果文件reducetask会将这些文件再进行合并归并排序 7、 合并成大文件后shuffle的过程也就结束了后面进入reducetask的逻辑运算过程从文件中取出一个一个的键值对group调用用户自定义的reduce()方法 Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率原则上说缓冲区越大磁盘io的次数越少执行速度就越快。 缓冲区的大小可以通过参数调整, 参数io.sort.mb 默认100M
MapReduce案例wordcount
wordcount是最简单也是最能体现MapReduce思想的程序之一可以称为MapReduce版Hello World单词计数主要完成功能是统计一系列文本文件中每个单词出现的次数即简单如下图所示
WordcountMapper.java
package wordcount;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*KEYIN, VALUEIN, KEYOUT, VALUEOUT
* 四个泛型解释
* KEYINK1的类型
* VALUEINV1的类型
*
* KEYOUTK2的类型
* VALUEOUTV2的类型
* */
public class WordCountMapper extends MapperLongWritable,Text,Text,LongWritable {//map方法就是将K1和V1转化为K2和V2/*参数key K1行偏移量valueV1 每一行的文本数据context表示上下文对象*/Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {Text text new Text();LongWritable longWritable new LongWritable();//将一行的文本数据进行拆分String[] split value.toString().split(,);//遍历数组进行组装K2和v2for (String word:split){//将K2和V2写入上文text.set(word);longWritable.set(1);context.write(text,longWritable);}// 将K2和v2写入上下文中}
}dataReduce.java
package wordcount;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;/*
* 四个泛型解释
* KEYINK2类型
* VALUEINV2类型
* KEYOUTK3类型
* VALUEOUTV3类型
*
* */
public class dataReduce extends ReducerText, LongWritable,Text,LongWritable {//把新的K2和V2转为K3和V3 将K3和V3写入上下文中/*参数key新K2values集合 新V2context表示上下文对象** K2 v2* hello 1,1*K3 v3hello 2*** */Overrideprotected void reduce(Text key, IterableLongWritable values, Context context) throws IOException, InterruptedException {long count0;//1.遍历结合将集合中数字相加得到v3for (LongWritable longWritable:values){countlongWritable.get();}//2.将K3和V3写入上下文中context.write(key,new LongWritable(count));}
}
TaskMain.java
package wordcount;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;public class TaskMain extends Configured implements Tool {//该方法用于指定一个Job任务public int run(String[] strings) throws Exception {//创建一个Job任务对象Job job Job.getInstance(super.getConf(), wordcount);job.setJarByClass(TaskMain.class);//2.获得job对象八个步骤//第一步指定文件的读取方式和读取路径job.setInputFormatClass(TextInputFormat.class);
// TextInputFormat.addInputPath(job,new Path(file:///D:\\mapreduce_data));TextInputFormat.addInputPath(job,new Path(hdfs://Master:9000/wordcount));//第二步指定map阶段的处理方式和数据类型job.setMapperClass(WordCountMapper.class);//设置map阶段K2的类型job.setMapOutputKeyClass(Text.class);//设置map阶段V2的类型job.setMapOutputValueClass(LongWritable.class);//第三、四、五、六 采用默认方式//第七步指定Reduc阶段的处理方式和数据类型job.setReducerClass(dataReduce.class);//设置K3的类型 v3的类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);//第八步设置输出类型,设置输出的路径job.setOutputFormatClass(TextOutputFormat.class);
// TextOutputFormat.setOutputPath(job,new Path(file:///D:\\test\\output2));TextOutputFormat.setOutputPath(job,new Path(hdfs://Master:9000/wordcount_results2));//等待任务结束boolean flag job.waitForCompletion(true);return flag?0:1;}public static void main(String[] args) throws Exception {Configuration configurationnew Configuration();int runToolRunner.run(configuration,new TaskMain(),args); //启动JOb任务System.exit(run);}
}提示 Mapper类的对象 一行一行地读取 原始数据的中内容 每读一行 就调用一次map方法 切割第一行的单词 生成键值对 K是单词 V是单词的数量 然后把所有键值对写入到临时文件 最后对临时文件排序 类似sql的groupby 随后shuffle随后Reducer读数据 每次读一组
输入拆分
输入到MapReduce工作被划分成固定大小的块叫做 input splits 输入折分是由单个映射消费输入块。
映射 - Mapping
这是在 map-reduce 程序执行的第一个阶段。在这个阶段中的每个分割的数据被传递给映射函数来产生输出值。在我们的例子中映射阶段的任务是计算输入分割出现每个单词的数量(更多详细信息有关输入分割在下面给出)并编制以某一形式列表单词出现频率
重排
这个阶段消耗映射阶段的输出。它的任务是合并映射阶段输出的相关记录。在我们的例子同样的词汇以及它们各自出现频率。
Reducing 在这一阶段从重排阶段输出值汇总。这个阶段结合来自重排阶段值并返回一个输出值。总之这一阶段汇总了完整的数据集。 在我们的例子中这个阶段汇总来自重排阶段的值计算每个单词出现次数的总和。
wordcount项目在MapReduce计算框架下的处理流程 首先通过job.waitForCompletion(true)开启了WordCount这个MapReduce作业后续通过InputFormat的实现类FileInputFormat将输入数据即输入文件分片从而得到Map方法即Map用户定义的方法的输入即图中所示FileInputFormat将文件按照行分割并组织成为的形式成为用户Map方法的输入其中Key是字符的偏移量value即一行的内容。 数据被输入到用户定义的map方法中map方法以文件中的每行数据作为输入将每行按照空格分词并将每个词组织为K-V对输出Map的输出交予了MapReduce框架来进行处理简单来说MapReduce框架将这些K-V对依照key的字典顺序由小到大排列并对相同的key的value进行合并为数组list输出给combine过程 将map方法的输出结果根据Key排序完成之后如果有combine过程被定义这时候MapReduce框架就会调用Combine过程。Combine过程是由用户指定的必须的过程一般Combine过程在逻辑上就是Reduce过程map的输出结果需要通过网络传递给reduce其作用是减少Map的输出的结果集的大小从而降低网络的开销。 用户通过job.setCombinerClass(IntSumReducer.class)指定Combine的实现类Combine其实就是在Map端先执行一次用户的reduce方法先在中间进行一次计算从而将结果集减少但是需要注意的是并不是所有的算法都适用进行多次reduce计算请谨慎选择 然后多个map的结果汇集到reduce由于WordCount就开启了一个reduce故只有一个reduce接收所有map端的输出在输入到用户定义的reduce方法之前MapReduce框架还会进行一步排序操作这步操作类似于在map端进行的排序将相同key的value合并为list不同的是排序的输入是来自于多个Map的输出是根据key排序的K-V对数据 经过排序后的K-ValueList对被输入到的Reduce方法在WordCount的reduce方法中它对每个key对应的value的list进行求和从而获得每个单词的总的出现次数。