当前位置: 首页 > news >正文

建设厅报名网站做PHP网站前端网站进不去

建设厅报名网站,做PHP网站前端网站进不去,商城网站建设教学,网络营销题库案例题文章目录 什么是HadoopHadoop发行版介绍Hadoop版本演变历史Hadoop3.x的细节优化Hadoop三大核心组件介绍HDFS体系结构NameNode介绍总结 SecondaryNameNode介绍DataNode介绍DataNode总结 MapReduce介绍分布式计算介绍MapReduce原理剖析MapReduce之Map阶段MapReduce之Reduce阶段 实… 文章目录 什么是HadoopHadoop发行版介绍Hadoop版本演变历史Hadoop3.x的细节优化Hadoop三大核心组件介绍HDFS体系结构NameNode介绍总结 SecondaryNameNode介绍DataNode介绍DataNode总结 MapReduce介绍分布式计算介绍MapReduce原理剖析MapReduce之Map阶段MapReduce之Reduce阶段 实战WordCount分析实战WordCount案例开发 MapReduce任务日志查看停止Hadoop集群中的任务 MapReduce程序扩展Shuffle过程详解Hadoop中序列化机制InputFormat分析面试题 OutputFormat分析 什么是Hadoop Hadoop是一个适合海量数据的分布式存储和分布式计算的框架。 分布式存储可以简单理解为存储数据的时候数据不只存在一台机器上面它会存在多台机器上面。分布式计算简单理解就是由很多台机器并行处理数据咱们在写java程序的时候写的一般都是单机的程序只在一台机器上运行这样程序的处理能力是有限的。 Hadoop的作者是Doug Cutting他在给这个框架起名字的时候是很偶然的作者的孩子有一个毛绒象玩具他孩子总是对着这个玩具叫 Hadoop、Hadoop、所以作者就以此来命名了。 Hadoop发行版介绍 目前Hadoop发行版非常的多有华为发行版、Intel发行版、Cloudera发行版CDH、Hortonworks 发行版HDP这些发行版都是基于Apache Hadoop衍生出来的 Apache是一个IT领域的公益组织类似于红十字会Apache这个组织里面的软件都是开源的大家可以随便使用随便修改我们后面学习的99%的大数据技术框架都是Apache开源的Cloudera Hadoop(CDH)CDH是一个商业版本它对官方版本做了一些优化提供收费技术支持提供界面操作方便集群运维管理CDH目前在企业中使用的还是比较多的虽然CDH是收费的但是CDH中的一些基本功能是不收费的可以一直使用高级功能是需要收费才能使用的如果不想付费也能凑合着使用HortonWorks(HDP)是开源的也提供的有界面操作方便运维管理一般互联网公司偏向于使用这个注意了再爆一个料最新消息目前HDP已经被CDH收购都是属于一个公司的产品后期HDP是否会合并到CDH中还不得而知具体还要看这个公司的运营策略了 Hadoop版本演变历史 hadoop1.xHDFSMapReduce hadoop2.xHDFSYARNMapReduce hadoop3.xHDFSYARNMapReduce 从Hadoop1.x升级到Hadoop2.x架构发生了比较大的变化这里面的HDFS是分布式存储MapRecue是分布式计算咱们前面说了Hadoop解决了分布式存储和分布式计算的问题对应的就是这两个模块在Hadoop2.x的架构中多了一个模块 YARN这个是一个负责资源管理的模块。 Hadoop的这一步棋走的是最好的这样自己摇身一变就变成了一个公共的平台由于它起步早占有的市场份额也多后期其它新兴起的计算框架一般都会支持在YARN上面运行这样Hadoop就保证了自己的地位。 咱们后面要学的Spark、Flink等计算框架都是支持在YARN上面执行的并且在实际工作中也都是在YARN上面执行。Hadoop3.x的架构并没有发生什么变化但是它在其他细节方面做了很多优化 Hadoop3.x的细节优化 1最低Java版本要求从Java7变为Java82在Hadoop 3中HDFS支持纠删码纠删码是一种比副本存储更节省存储空间的数据持久化存储方法使用这种方法相同容错的情况下可以比之前节省一半的存储空间3 Hadoop 2中的HDFS最多支持两个NameNode一主一备而Hadoop 3中的HDFS支持多个NameNode一主多备4MapReduce任务级本地优化MapReduce添加了映射输出收集器的本地化实现的支持。对于密集型的洗牌操作shuffle-intensivejobs可以带来30%的性能提升5修改了多重服务的默认端口Hadoop2中一些服务的端口和Hadoop3中是不一样的 详细的优化 Hadoop三大核心组件介绍 Hadoop主要包含三大组件HDFSMapReduceYARN HDFS负责海量数据的分布式存储MapReduce是一个计算模型负责海量数据的分布式计算YARN主要负责集群资源的管理和调度 HDFS体系结构 HDFS支持主从结构主节点称为 NameNode 是因为主节点上运行的有NameNode进程 NameNode支持多个目前我们的集群中只配置了一个 从节点称为 DataNode 是因为从节点上面运行的有DataNode进程DataNode支持多个目前我们的集群中有两个 HDFS中还包含一个 SecondaryNameNode 进程这个进程从字面意思上看像是第二个NameNode的意思其实不是 NameNode介绍 NameNode是整个文件系统的管理节点 它主要维护着整个文件系统的文件目录树文件/目录的信息 和 每个文件对应的数据块列表并且还负责接收用户的操作请求 文件/目录的信息表示文件/目录的的一些基本信息所有者 属组 修改时间 文件大小等信息每个文件对应的数据块列表如果一个文件太大那么在集群中存储的时候会对文件进行切割这个时候就类似于会给文件分成一块一块的存储到不同机器上面。所以HDFS还要记录一下一个文件到底被分了多少块每一块都在什么地方存储着接收用户的操作请求其实我们在命令行使用hdfs操作的时候是需要先和namenode通信 才能开始去操作数据的。 我们可以到集群的9870界面查看一下随便找一个文件看一下点击文件名称可以看到Block information 但是文件太小只有一个块 叫Block 0 NameNode主要包括以下文件 这些文件所在的路径是由hdfs-default.xml的dfs.namenode.name.dir属性控制的 hdfs-default.xml文件在哪呢 它在hadoop-3.2.0\share\hadoop\hdfs\hadoop-hdfs-3.2.0.jar中这个文件中包含了HDFS相关的所有默认参数咱们在配置集群的时候会修改一个hdfs-site.xml文件hdfs-site.xml文件属于hdfs-default.xml的一个扩展它可以覆盖掉hdfs-default.xml中同名的参数。 看一下这个文件中的dfs.namenode.name.dir属性: property namedfs.namenode.name.dir/name valuefile://${hadoop.tmp.dir}/dfs/name/value descriptionDetermines where on the local filesystem the DFS name node should store the name table(fsimage). If this is a comma-delimited list of directories then the name table is replicated in all of the directories, for redundancy. /description /property这个属性的值是由hadoop.tmp.dir属性控制的这个属性的值默认在core-default.xml文件中。 里面有edits文件 和fsimage文件: fsimage文件有两个文件名相同的有一个后缀是md5 md5是一种加密算法这个其实主要是为了做md5校验的为了保证文件传输的过程中不出问题相同内容的md5是一样的所以后期如果我把这个fsimage和对应fsimage.md5发给你 然后你根据md5对fsimage的内容进行加密获取一个值 和fsimage.md5中的内容进行比较如果一样说明你接收到的文件就是完整的。 在这里可以把fsimage 拆开 fs 是文件系统 filesystem image是镜像说明是文件系统镜像就是给文件照了一个像把文件的当前信息记录下来我们可以看一下这个文件这个文件需要使用特殊的命令进行查看-i 输入文件 -o 输出文件 里面最外层是一个fsimage标签看里面的inode标签这个inode表示是hdfs中的每一个目录或者文件信息 分析生成的edits.xml文件这个地方注意可能有的edits文件生成的edits.xml为空需要多试几个。 这个edits.xml中可以大致看一下里面有很多record。每一个record代表不同的操作 我们所有对hdfs的增删改操作都会在edits文件中留下信息那么fsimage文件中的内容是从哪来的 其实是这样的edits文件会定期合并到fsimage文件中。 secondarynamenode 这个进程就是负责定期的把edits中的内容合并到fsimage中。他只做一件事这是一个单独的进程在实际工作中部署的时候也需要部署到一个单独的节点上面 总结 fsimage: 元数据镜像文件存储某一时刻NameNode内存中的元数据信息就类似是定时做了一个快照操作。【这里的元数据信息是指文件目录树、文件/目录的信息、每个文件对应的数据块列表】edits: 操作日志文件【事务文件】这里面会实时记录用户的所有操作seen_txid: 是存放transactionId的文件format之后是0它代表的是namenode里面的edits_*文件的尾数,namenode重启的时候会按照seen_txid的数字顺序从头跑edits_0000001~到seen_txid的数字。如果根据对应的seen_txid无法加载到对应的文件NameNode进程将不会完成启动以保护数据一致性。VERSION:保存了集群的版本信息 SecondaryNameNode介绍 SecondaryNameNode主要负责定期的把edits文件中的内容合并到fsimage中 这个合并操作称为checkpoint在合并的时候会对edits中的内容进行转换生成新的内容保存到fsimage文件中。 注意在NameNode的HA架构中没有SecondaryNameNode进程文件合并操作会由standby NameNode负责实现 所以在Hadoop集群中SecondaryNameNode进程并不是必须的。 DataNode介绍 DataNode是提供真实文件数据的存储服务 针对datanode主要掌握两个概念一个是block一个是replication 首先是block HDFS会按照固定的大小顺序对文件进行划分并编号划分好的每一个块称一个BlockHDFS默认Block大小是 128MBBlokc块是HDFS读写数据的基本单位不管你的文件是文本文件 还是视频 或者音频文件针对hdfs而言都是字节。 我们之前上传的一个user.txt文件他的block信息可以在fsimage文件中看到也可以在hdfs webui上面看到, 里面有block的id信息,并且也会显示这个数据在哪个节点上面 这里显示在bigdata02和bigdata03上面都有那我们过去看一下datanode中数据的具体存储位置是由dfs.datanode.data.dir来控制的通过查询hdfs-default.xml可以知道。注意这个block中的内容可能只是文件的一部分如果你的文件较大的话就会分为多个block存储默认 hadoop3中一个block的大小为128M。根据字节进行截取截取到128M就是一个block。如果文件大小没有默认的block块大那最终就只有一个block。 假设我们上传了两个10M的文件 又上传了一个200M的文件 问1会产生多少个block块 4个 问2在hdfs中会显示几个文件3个 下面看一下副本副本表示数据有多少个备份 我们现在的集群有两个从节点所以最多可以有2个备份这个是在hdfs-site.xml中进行配置的 dfs.replication 默认这个参数的配置是3。表示会有3个副本。副本只有一个作用就是保证数据安全 DataNode总结 block块存放在哪些datanode上只有datanode自己知道当集群启动的时候datanode会扫描自己节点上面的所有block块信息然后把节点和这个节点上的所有block块信息告诉给namenode。这个关系是每次重启集群都会动态加载的【这个其实就是集群为什么数据越多启动越慢的原因】 咱们之前说的fsimage(edits)文件中保存的有文件和block块之间的信息。 这里说的是block块和节点之间的关系这两块关联在一起之后就可以根据文件找到对应的block块再根据block块找到对应的datanode节点这样就真正找到了数据。 所以说 其实namenode中不仅维护了文件和block块的信息 还维护了block块和所在的datanode节点的信息。 可以理解为namenode维护了两份关系 第一份关系file 与block list的关系对应的关系信息存储在fsimage和edits文件中,当NameNode启动的时候会把文件中的元数据信息加载到内存中第二份关系datanode与block的关系对应的关系主要在集群启动的时候保存在内存中,当DataNode启动时会把当前节点上的Block信息和节点信息上报给NameNode 注意 我们说了NameNode启动的时候会把文件中的元数据信息加载到内存中然后每一个文件的元数据信息会占用150字节的内存空间这个是恒定的和文件大小没有关系咱们前面在介绍HDFS的时候说过HDFS不适合存储小文件其实主要原因就在这里不管是大文件还是小文件一个文件的元数据信息在NameNode中都会占用150字节NameNode节点的内存是有限的所以它的存储能力也是有限的如果我们存储了一堆都是几KB的小文件最后发现NameNode的内存占满了确实存储了很多文件但是文件的总体大小却很小这样就失去了HDFS存在的价值。 我们前面说了namenode不要随便格式化因为格式化了以后VERSION里面的clusterID会变但是datanode的VERSION中的clusterID并没有变所以就对应不上了。 咱们之前说过如果确实要重新格式化的话需要把/data/hadoop_repo数据目录下的内容都清空全部都重新生成是可以的。 HDFS主要是负责存储海量数据的如果只是把数据存储起来除了浪费磁盘空间是没有任何意义的我们把数据存储起来之后是希望能从这些海量数据中分析出来一些有价值的内容这个时候就需要有一个比较厉害的计算框架来快速计算这一批海量数据所以MapReduce应运而生了。 MapReduce介绍 举个栗子 计算扑克牌中的黑桃个数? 就是我们平时打牌时用的扑克牌现在呢有一摞牌我想知道这摞牌中有多少张黑桃 最直接的方式是一张一张检查并且统计出有多少张是黑桃但是这种方式的效率比较低如果说这一摞牌 只有几十张也就无所谓了如果这一摞拍有上千张呢你一张一张去检查还不疯了 这个时候我们可以使用MapReduce的计算方法 第一步把这摞牌分配给在座的所有玩家第二步让每个玩家查一下自己手中的牌有多少张是黑桃然后把这个数目汇报给你第三步你把所有玩家告诉你的数字加起来得到最终的结果 之前是一张一张的串行计算现在使用mapreduce是把数据分配给多个人并行计算每一个人获得一个局部聚合的临时结果最终再统一汇总一下。 这样就可以快速得到答案了这其实就是MapReduce的计算思想。 分布式计算介绍 移动数据是传统的计算方式现在的一种新思路是移动计算。 如果我们数据量很大的话我们的数据肯定是由很多个节点存储的这个时候我们就可以把我们的程序代码拷贝到对应的节点上面去执行程序代码都是很小的一般也就几十KB或者几百KB加上外部依赖包最大也就几兆 甚至几十兆但是我们需要计算的数据动辄都是几十G、几百G他们两个之间的差距不是一星半点. 这样我们的代码就可以在每个数据节点上面执行了但是这个代码只能计算当前节点上的数据的如果我们想要统计数据的总行数这里每个数据节点上的代码只能计算当前节点上数据的行数所以还的有一个汇总程序这样每个数据节点上面计算的临时结果就可以通过汇总程序得到最终的结果了。此时汇总程序需要传递的数据量就很小了只需要接收一个数字即可。这个计算过程就是分布式计算这个步骤分为两步。 第一步对每个节点上面的数据进行局部计算第二步对每个节点上面计算的局部结果进行最终全局汇总 MapReduce原理剖析 MapReduce是一种分布式计算模型是Google提出来的主要用于搜索领域解决海量数据的计算问题. MapReduce是分布式运行的由两个阶段组成Map和Reduce Map阶段是一个独立的程序在很多个节点同时运行每个节点处理一部分数据。Reduce阶段也是一个独立的程序可以在一个或者多个节点同时运行每个节点处理一部分数据【在这我们先把reduce理解为一个单独的聚合程序即可】。 在这map就是对数据进行局部汇总reduce就是对局部数据进行最终汇总。 结合到我们前面分析的统计黑桃的例子中这里的map阶段就是指每个人统计自己手里的黑桃的个数reduce就是对每个人统计的黑桃个数进行最终汇总 这是一个Hadoop集群一共5个节点 一个主节点四个从节点 这里面我们只列出来了HDFS相关的进程信息 假设我们有一个512M的文件这个文件会产生4个block块假设这4个block块正好分别存储到了集群的4个节点上我们的计算程序会被分发到每一个数据所在的节点然后开始执行计算在map阶段针对每一个block块对应的数据都会产生一个map任务(这个map任务其实就是执行这个计算程序的)在这里也就意味着会产生4个map任务并行执行4个map阶段都执行完毕以后会执行reduce阶段在reduce阶段中会对这4个map任务的输出数据进行汇总统计得到最终的结果。 下面看一个官方的mapreduce原理图 左下角是一个文件文件最下面是几个block块说明这个文件被切分成了这几个block块文件上面是一些split注意咱们前面说的每个block产生一个map任务其实这是不严谨的其实严谨一点来说的话应该是一个split产生一个map任务。 那这里的block和split之间有什么关系吗 我们来分析一下block块是文件的物理切分在磁盘上是真实存在的。是对文件的真正切分 而split是逻辑划分不是对文件真正的切分默认情况下我们可以认为一个split的大小和一个block的大小是一样的所以实际上是一个split会产生一个map task 这里面的map Task就是咱们前面说的map任务看后面有一个reduce Taskreduce会把结果数据输出到hdfs上有几个reduce任务就会产生几个文件这里有三个reduce任务就产生了3个文件咱们前面分析的案例中只有一个reduce任务做全局汇总 注意看map的输入 输出 reduce的输入 输出map的输入是k1,v1 输出是k2,v2 reduce的输入是k2,v2 输出是k3,v3 都是键值对的形式。 在这注意一下为什么在这是1,2,3呢 这个主要是为了区分数据方便理解没有其它含义这是我们人为定义的。 MapReduce之Map阶段 mapreduce主要分为两大步骤 map和reducemap和reduce在代码层面对应的就是两个类map对应的是mapper类reduce对应的是reducer类下面我们就来根据一个案例具体分析一下这两个步骤 假设我们有一个文件文件里面有两行内容 第一行是hello you 第二行是hello me 我们想统计文件中每个单词出现的总次数 首先是map阶段 1第一步框架会把输入文件(夹)划分为很多InputSplit这里的inputsplit就是前面我们所说的split【对文件进行逻辑划分产生的】默认情况下每个HDFS的Block对应一个InputSplit。再通过RecordReader类把每个InputSplit解析成一个一个的k1,v1。默认情况下每一行数据都会被解析成一个k1,v1 这里的k1是指每一行的起始偏移量v1代表的是那一行内容所以针对文件中的数据经过map处理之后的结果是这样的 0hello you 10hello me注意map第一次执行会产生0hello you第二次执行会产生10hello me并不是执行一次就获取到这两行结果了因为每次只会读取一行数据我在这里只是把这两行执行的最终结果都列出来了 2第二步框架调用Mapper类中的map(…)函数map函数的输入是k1,v1输出是k2,v2。一个InputSplit对应一个map task。程序员需要自己覆盖Mapper类中的map函数实现具体的业务逻辑。 因为我们需要统计文件中每个单词出现的总次数所以需要先把每一行内容中的单词切开然后记录出现次数为1,这个逻辑就需要我们在map函数中实现了。那针对0hello you执行这个逻辑之后的结果就是 hello,1 you,1 针对10hello me执行这个逻辑之后的结果是 hello,1 me,13第三步框架对map函数输出的k2,v2进行分区。不同分区中的k2,v2由不同的reduce task处理默认只有1个分区所以所有的数据都在一个分区最后只会产生一个reduce task。 经过这个步骤之后数据没什么变化如果有多个分区的话需要把这些数据根据分区规则分开在这里默认只有1个分区。 hello,1 you,1 hello,1 me,1咱们在这所说的单词计数其实就是把每个单词出现的次数进行汇总即可需要进行全局的汇总不需要进行分区所以一个redeuce任务就可以搞定 如果你的业务逻辑比较复杂需要进行分区那么就会产生多个reduce任务了 那么这个时候map任务输出的数据到底给哪个reduce使用这个就需要划分一下要不然就乱套了。假设有两个reducemap的输出到底给哪个reduce如何分配这是一个问题。 这个问题由分区来完成。map输出的那些数据到底给哪个reduce使用这个就是分区干的事了。 4第四步框架对每个分区中的数据都会按照k2进行排序、分组。分组指的是相同k2的v2分成一个组。先按照k2排序 hello,1 hello,1 me,1 you,1 然后按照k2进行分组把相同k2的v2分成一个组 hello,{1,1} me,{1} you,{1}5第五步在map阶段框架可以选择执行Combiner过程 Combiner可以翻译为规约规约是什么意思呢 在刚才的例子中咱们最终是要在reduce端计算单词出现的总次数的所以其实是可以在map端提前执行reduce的计算逻辑先对在map端对单词出现的次数进行局部求和操作这样就可以减少map端到reduce端数据传输的大小这就是规约的好处当然了并不是所有场景都可以使用规约针对求平均值之类的操作就不能使用规约了否则最终计算的结果就不准确了。Combiner一个可选步骤默认这个步骤是不执行的。 6第六步框架会把map task输出的k2,v2写入到linux 的磁盘文件中 hello,{1,1} me,{1} you,{1}至此整个map阶段执行结束 最后注意一点 MapReduce程序是由map和reduce这两个阶段组成的但是reduce阶段不是必须的也就是说有的 mapreduce任务只有map阶段为什么会有这种任务呢 是这样的咱们前面说过其实reduce主要是做最终聚合的如果我们这个需求是不需要聚合操作直接对数据做过滤处理就行了那也就意味着数据经过map阶段处理完就结束了所以如果reduce阶段不存在的话map的结果是可以直接保存到HDFS中的 注意如果没有reduce阶段其实map阶段只需要执行到第二步就可以第二步执行完成以后结果就可以直接输出到HDFS了。 MapReduce之Reduce阶段 1第一步框架对多个map任务的输出按照不同的分区通过网络copy到不同的reduce节点。这个过程称作shuffle 针对我们这个需求只有一个分区所以把数据拷贝到reduce端之后还是老样子 hello,{1,1} me,{1} you,{1}2第二步框架对reduce端接收的相同分区的k2,v2数据进行合并、排序、分组。 reduce端接收到的是多个map的输出对多个map任务中相同分区的数据进行合并 排序 分组 注意之前在map中已经做了排序 分组这边也做这些操作 重复吗 不重复因为map端是局部的操作 reduce端是全局的操作 之前是每个map任务内进行排序是有序的但是多个map任务之间就是无序的了。 不过针对我们这个需求只有一个map任务一个分区所以最终的结果还是老样子 hello,{1,1} me,{1} you,{1}3 第三步框架调用Reducer类中的reduce方法reduce方法的输入是k2,{v2}输出是k3,v3。一个k2,{v2}调用一次reduce函数。程序员需要覆盖reduce函数实现具体的业务逻辑。 那我们在这里就需要在reduce函数中实现最终的聚合计算操作了将相同k2的{v2}累加求和然后再转化为k3,v3写出去在这里最终会调用三次reduce函数 hello,2 me,1 you,1 第四步框架把reduce的输出结果保存到HDFS中。 hello 2 me 1 you 1至此整个reduce阶段结束。 实战WordCount分析 重新梳理一下单词计数的执行流程 上面的是单个文件的执行流程有一些现象看起来还是不明显 下面我们来看一个两个文件的执行流程 实战WordCount案例开发 前面我们通过理论层面详细分析了单词计数的执行流程下面我们就来实际上手操作一下。 大致流程如下 第一步开发Map阶段代码第二步开发Reduce阶段代码第三步组装Job 在idea中创建WordCountJob类 添加注释梳理一下需求 需求读取hdfs上的hello.txt文件计算文件中每个单词出现的总次数 hello.txt文件内容如下 hello you hello me 最终需要的结果形式如下 hello 2 me 1 you 1 先创建map阶段的代码在这里需要自定义一个mapper类继承框架中的Mapper类 /** * 创建自定义mapper类 */ public static class MyMapper extends MapperLongWritable,Text,Text,LongWr /** * 需要实现map函数 * 这个map函数就是可以接收k1,v1 产生k2,v2 * param k1 * param v1 * param context * throws IOException * throws InterruptedException */ Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {// k1代表的是每一行的行首偏移量v1代表的是每一行内容// 对获取到的每一行数据进行切割把单词切割出来String[] words v1.toString().split( );// 迭代切割出来的单词数据for (String word:words) {// 把迭代出来的单词封装成k2,v2的形式Text k2 new Text(word);LongWritable v2 new LongWritable(1L);// 把k2,v2写出去context.write(k2,v2);} }然后是reduce阶段的代码需要自定义一个reducer类继承框架中的reducer。 /** * 创建自定义的reducer类 */ public static class MyReducer extends ReducerText,LongWritable,Text,LongW /** * 针对v2s的数据进行累加求和 并且最终把数据转化为k3,v3写出去 * param k2 * param v2s * param context * throws IOException * throws InterruptedException */ Override protected void reduce(Text k2, IterableLongWritable v2s, Context co throws IOException, InterruptedException { // 创建一个sum变量保存v2s的和long sum 0L;for (LongWritable v2 : v2s) {sum v2.get();}// 组装k3,v3Text k3 k2;LongWritable v3 new LongWritable(sum);// 把结果写出去context.write(k3,v3);} }最终组装jobjob等于mapreduce public class WordCountJob { /** * 组装jobmapreduce * param args */ public static void main(String[] args) {try {if(args.length!2){// 如果传递的参数不够程序直接退出System.exit(100);}// job需要的配置参数Configuration conf new Configuration();// 创建一个jobJob job Job.getInstance(conf);// 注意这一行必须设置否则在集群中执行的是找不到WordCountJob这个类job.setJarByClass(WordCountJob.class);// 指定输入路径(可以是文件也可以是目录)FileInputFormat.setInputPaths(job,new Path(args[0]));// 指定输出路径(只能指定一个不存在的目录)FileOutputFormat.setOutputPath(job,new Path(args[1]));// 指定map相关的代码job.setMapperClass(MyMapper.class);// 指定k2的类型job.setMapOutputKeyClass(Text.class);// 指定v2的类型job.setMapOutputValueClass(LongWritable.class);// 指定reduce相关的代码job.setReducerClass(MyReducer.class);// 指定k3的类型job.setOutputKeyClass(Text.class);// 指定v3的类型job.setOutputValueClass(LongWritable.class);// 提交jobjob.waitForCompletion(true);}catch (Exception e){e.printStackTrace();}} }现在代码开发完毕了现在我们是把自定义的mapper类和reducer类都放到了这个WordCountJob类 中主要是为了在学习阶段看起来清晰一些所有的代码都在一个类中好找其实我们完全可以把自定义的mapper类和reducer类单独提出去定义为单独的类是没有什么区别的。 ok那代码开发好了以后想要执行我们需要打jar包上传到集群上去执行这个时候需要在pom文件中添加maven的编译打包插件。 buildplugins!-- compiler插件, 设定JDK版本 --plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion2.3.2/versionconfigurationencodingUTF-8/encodingsource1.8/sourcetarget1.8/targetshowWarningstrue/showWarnings/configuration/pluginpluginartifactIdmaven-assembly-plugin/artifactIdconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefsarchivemanifestmainClass/mainClass/manifest/archive/configurationexecutionsphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/plugin/plugins /build注意了这些添加完以后还有一个地方需要修改需要在pom中的hadoop-client和log4j依赖中增加 scope属性值为provided表示只在编译的时候使用这个依赖在执行以及打包的时候都不使用因为hadoop-client和log4j依赖在集群中都是有的所以在打jar包的时候就不需要打进去了如果我们使用到了集群中没有的第三方依赖包就不需要增加这个provided属性了不增加provided就可以把对应的第三方依赖打进jar包里面了。 接下来就可以向集群提交MapReduce任务了 hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCounthadoop表示使用hadoop脚本提交任务其实在这里使用yarn脚本也是可以的从hadoop2开始支持使用yarn不过也兼容hadoop1也继续支持使用hadoop脚本所以在这里使用哪个都可以具体就看你个人的喜好了我是习惯于使用hadoop脚本jar表示执行jar包db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar指定具体的jar包路径信息com.imooc.mr.WordCountJob指定要执行的mapreduce代码的全路径/test/hello.txt指定mapreduce接收到的第一个参数代表的是输入路径这里的输入路径可以直接指定hello.txt的路径也可以直接指定它的父目录因为它的父目录里面也没有其它无关的文件如果指定目录的话就意味着hdfs会读取这个目录下所有的文件所以后期如果我们需要处理一批文件那就可以把他们放到同一个目录里面直接指定目录即可。/out指定mapreduce接收到的第二个参数代表的是输出目录这里的输出目录必须是不存在的MapReduce程序在执行之前会检测这个输出目录如果存在会报错因为它每次执行任务都需要一个新的输出目录来存储结果数据 MapReduce任务日志查看 如果想要查看mapreduce任务执行过程产生的日志信息怎么办呢 是不是在提交任务的时候直接在这个控制台上就能看到了先不要着急我们先在代码中增加一些日志信息在实际工作中做调试的时候这个也是很有必要的 在自定义mapper类的map函数中增加一个输出将k1,v1的值打印出来 protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { //输出k1,v1的值System.out.println(k1,v1k1.get(),v1.toString());// k1代表的是每一行的行首偏移量v1代表的是每一行内容// 对获取到的每一行数据进行切割把单词切割出来String[] words v1.toString().split( );// 迭代切割出来的单词数据for (String word:words) {// 把迭代出来的单词封装成k2,v2的形式Text k2 new Text(word);LongWritable v2 new LongWritable(1L);System.out.println(k2:word...v2:1);// 把k2,v2写出去context.write(k2,v2);} }在自定义reducer类中的reduce方法中增加一个输出将k2,v2和k3,v3的值打印出来 protected void reduce(Text k2, IterableLongWritable v2s, Context context) throws IOException, InterruptedException {// 创建一个sum变量保存v2s的和long sum 0L;for (LongWritable v2 : v2s) {//输出k2,v2的值System.out.println(k2,v2k2.toString(),v2.get());sum v2.get();}// 组装k3,v3Text k3 k2;LongWritable v3 new LongWritable(sum);//输出k3,v3的值System.out.println(k3,v3k3.toString(),v3.get());// 把结果写出去context.write(k3,v3); }重新在windows机器上打jar包并把新的jar包上传到bigdata01机器的/data/soft/hadoop-3.2.0目录中重新向集群提交任务注意针对输出目录要么换一个新的不存在的目录要么把之前的out目录删掉 等待任务执行结束我们发现在控制台上是看不到任务中的日志信息的为什么呢因为我们在这相当于是通过一个客户端把任务提交到集群里面去执行了所以日志是存在在集群里面的。想要查看需要需要到一个特殊的地方查看这些日志信息 先进入到yarn的web界面访问8088端口点击对应任务的history链接 http://bigdata01:8088/ 在这里我们发现这个链接是打不来的 这里有两个原因第一个原因是没有windows的hosts文件中没有配置bigdata02和bigdata03这两个主机名和ip的映射关系先去把这两个主机名配置到hosts文件里面之前的bigdata01已经配置进去了。 第二个原因就是这里必须要启动historyserver进程才可以并且还要开启日志聚合功能才能在web界面上直接查看任务对应的日志信息因为默认情况下任务的日志是散落在nodemanager节点上的想要查看需要找到对应的nodemanager节点上去查看这样就很不方便通过日志聚合功能我们可以把之前本来散落在nodemanager节点上的日志统一收集到hdfs上的指定目录中这样就可以在yarn的web界面中直接查看了. 那我们就来开启日志聚合功能。开启日志聚合功能需要修改yarn-site.xml的配置增加 yarn.log-aggregation-enable和yarn.log.server.url这两个参数 propertynameyarn.log-aggregation-enable/namevaluetrue/value/propertypropertynameyarn.log.server.url/namevaluehttp://bigdata01:19888/jobhistory/logs//value /property注意修改这个配置想要生效需要重启集群。 启动historyserver进程需要在集群的所有节点上都启动这个进程 [rootbigdata01 hadoop-3.2.0]# bin/mapred --daemon start historyserver [rootbigdata01 hadoop-3.2.0]# jps 4232 SecondaryNameNode 5192 JobHistoryServer 4473 ResourceManager 3966 NameNode 5231 Jps [rootbigdata02 hadoop-3.2.0]# bin/mapred --daemon start historyserver [rootbigdata02 hadoop-3.2.0]# jps 2904 Jps 2523 NodeManager 2844 JobHistoryServer 2415 DataNode [rootbigdata03 hadoop-3.2.0]# bin/mapred --daemon start historyserver [rootbigdata03 hadoop-3.2.0]# jps 3138 JobHistoryServer 2678 NodeManager 2570 DataNode 3198 Jps重新再提交mapreduce任务 此时再进入yarn的8088界面点击任务对应的history链接就可以打开了 停止Hadoop集群中的任务 如果一个mapreduce任务处理的数据量比较大的话这个任务会执行很长时间可能几十分钟或者几个小时都有可能假设一个场景任务执行了一半了我们发现我们的代码写的有问题需要修改代码重新提交执行这个时候之前的任务就没有必要再执行了没有任何意义了最终的结果肯定是错误的所以我们就想把它停掉要不然会额外浪费集群的资源如何停止呢 我在提交任务的窗口中按ctrlc是不是就可以停止 注意了不是这样的我们前面说过这个任务是提交到集群执行的你在提交任务的窗口中执行ctrlc 对已经提交到集群中的任务是没有任何影响的。 我们可以验证一下执行ctrlc之后你再到yarn的8088界面查看会发现任务依然存在。 所以需要使用hadoop集群的命令去停止正在运行的任务 使用yarn application -kill命令后面指定任务id即可 [rootbigdata01 hadoop-3.2.0]# yarn application -kill application_15877135678 MapReduce程序扩展 MapReduce任务是由map阶段和reduce阶段组成的 但是我们也说过reduce阶段不是必须的那也就意味着MapReduce程序可以只包含map阶段。 什么场景下会只需要map阶段呢 当数据只需要进行普通的过滤、解析等操作不需要进行聚合这个时候就不需要使用reduce阶段了 在代码层面该如何设置呢 很简单在组装Job的时候设置reduce的task数目为0就可以了。并且Reduce代码也不需要写了。 package com.imooc.mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * 只有Map阶段不包含Reduce阶段 */ public class WordCountJobNoReduce { /** * 创建自定义mapper类 */ public static class MyMapper extends MapperLongWritable,Text,Text,LongWr Logger logger LoggerFactory.getLogger(MyMapper.class); /** * 需要实现map函数 * 这个map函数就是可以接收k1,v1 产生k2,v2 * param k1 * param v1 * param context * throws IOException * throws InterruptedException */ Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {//输出k1,v1的值//System.out.println(k1,v1k1.get(),v1.toString());logger.info(k1,v1k1.get(),v1.toString());// k1代表的是每一行的行首偏移量v1代表的是每一行内容// 对获取到的每一行数据进行切割把单词切割出来String[] words v1.toString().split( );// 迭代切割出来的单词数据for (String word:words) {// 把迭代出来的单词封装成k2,v2的形式Text k2 new Text(word);LongWritable v2 new LongWritable(1L);// 把k2,v2写出去context.write(k2,v2);}} }public static void main(String[] args) {try {if(args.length!2){// 如果传递的参数不够程序直接退出System.exit(100);}// job需要的配置参数Configuration conf new Configuration();// 创建一个jobJob job Job.getInstance(conf);// 注意这一行必须设置否则在集群中执行的是找不到WordCountJob这个类job.setJarByClass(WordCountJobNoReduce.class);// 指定输入路径(可以是文件也可以是目录)FileInputFormat.setInputPaths(job,new Path(args[0]));// 指定输出路径(只能指定一个不存在的目录)FileOutputFormat.setOutputPath(job,new Path(args[1]));// 指定map相关的代码job.setMapperClass(MyMapper.class);// 指定k2的类型job.setMapOutputKeyClass(Text.class);// 指定v2的类型job.setMapOutputValueClass(LongWritable.class);//禁用reduce阶段job.setNumReduceTasks(0);// 提交jobjob.waitForCompletion(true);}catch (Exception e){e.printStackTrace();} } }重新编译打包上传到bigdata01机器上 然后将最新的任务提交到集群上面注意修改入口类全类名 [rootbigdata01 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dep 2020-04-25 00:24:06,939 INFO mapreduce.Job: map 0% reduce 0% 2020-04-25 00:24:15,107 INFO mapreduce.Job: map 100% reduce 0% 2020-04-25 00:24:15,120 INFO mapreduce.Job: Job job_1587745397573_0001 complet这里发现map执行到100%以后任务就执行成功了reduce还是0%因为就没有reduce阶段了。 Shuffle过程详解 shuffer是一个网络拷贝的过程是指通过网络把数据从map端拷贝到reduce端的过程下面我们来详细分析一下这个过程。 首先看map阶段最左边有一个inputsplit最终会产生一个map任务map任务在执行的时候会把 k1,v1转化为k2,v2这些数据会先临时存储到一个内存缓冲区中这个内存缓冲区的大小默认是100Mio.sort.mb属性当达到内存缓冲区大小的80%io.sort.spill.percent也就是80M的时候会把内存中的数据溢写到本地磁盘中mapred.local.dir一直到map把所有的数据都计算完最后会把内存缓冲区中的数据一次性全部刷新到本地磁盘文件中在这个图里面表示产生了3个临时文件每个临时文件中有3个分区这是由于map阶段中对数据做了分区所以数据在存储的时候在每个临时文件中也划分为了3块最后需要对这些临时文件进行合并合并为一个大文件因为一个map任务最终只会产生一个文件这个合并之后的文件也是有3个分区的这3个分区的数据会被shuffle线程分别拷贝到三个不同的reduce节点图里面只显示了一个reduce节点下面还有两个没有显示。不同map任务中的相同分区的数据会在同一个reduce节点进行合并合并以后会执行reduce的功能最终产生结果数据。 在这里shuffle其实是横跨map端和reduce端的它主要是负责把map端产生的数据通过网络拷贝到 reduce阶段进行统一聚合计算。 Hadoop中序列化机制 在开发MapReduce程序的时候使用到了LongWritable和Text这些数据类型这些数据类型对应的是Java中的Long和String那MapReduce为什么不直接使用Java中的这些数据类型呢那肯定是嫌弃 Java中的这些数据类型使用起来不爽那具体不爽在什么地方呢 这个其实就涉及到序列化这个知识点了下面我们来分析一下来看这张图 我们的map阶段在读取数据的是需要从hdfs中读取的这里面需要经过磁盘IO和网络IO不过正常情况下map任务会执行本地计算也就是map任务会被分发到数据所在的节点进行计算这个时候网络io几乎就没有了就剩下了磁盘io再往后面看map阶段执行完了以后数据会被写入到本地磁盘文件这个时候也需要经过磁盘io后面的shuffle拷贝数据其实也需要先经过磁盘io把数据从本地磁盘读出来再通过网络发送到reduce节点再写入reduce节点的本地磁盘然后reduce阶段在执行的时候会经过磁盘io读取本地文件中的数据计算完成以后还会经过磁盘io和网络io把数据写入到hdfs中。 经过我们刚才的分析其实在这里面占得比重最高的是磁盘io所以说影响mapreduce任务执行效率的主要原因就是磁盘io如果想要提高任务执行效率就需要从这方面着手分析。 当程序在向磁盘中写数据以及从磁盘中读取数据的时候会对数据进行序列化和反序列化磁盘io这些步骤我们省略不了但是我们可以从序列化和反序列化这一块来着手做一些优化 首先我们分析一下序列化和反序列化看这个图当我们想把内存中的数据写入到文件中的时候会对数据序列化然后再写入这个序列化其实就是把内存中的对象信息转成二进制的形式方便存储到文件中默认java中的序列化会把对象及其父类、超类的整个继承体系信息都保存下来这样存储的信息太大了就会导致写入文件的信息过大这样写入是会额外消耗性能的。 反序列化也是一样reduce端想把文件中的对象信息加载到内存中如果文件很大在加载的时候也会额外消耗很多性能所以如果我们把对象存储的信息尽量精简那么就可以提高数据写入和读取消耗的性能。 基于此hadoop官方实现了自己的序列化和反序列化机制没有使用java中的序列化机制所以 hadoop中的数据类型没有沿用java中的数据类型而是自己单独设计了一些writable的实现了例如、longwritable、text等 那我们来看一下Hadoop中提供的常用的基本数据类型的序列化类 Java基本类型 Writable 序列化大小(字节) 布尔型boolean BooleanWritable 1 字节型byte ByteWritable 1 整型int IntWritable 4VIntWritable 1~5 浮点型float FloatWritable 4 长整型long LongWritable 8VLongWritable 1~9 双精度浮点型double DoubleWritable 8在这需要注意一下 Text等价于java.lang.String的Writable针对UTF-8序列NullWritable是单例获取实例使用NullWritable.get() 那下面我们来总结一下hadoop自己实现的序列化有什么特点 紧凑: 高效使用存储空间快速: 读写数据的额外开销小可扩展: 可透明地读取老格式的数据互操作: 支持多语言的交互 对应的我们也对java中序列化的不足之后做了一个总结 不精简附加信息多不太适合随机访问存储空间大递归地输出类的超类描述直到不再有超类 Java中的序列化和Hadoop中的序列化其实最主要的区别就是针对相同的数据Java中的序列化会占用较大的存储空间而Hadoop中的序列化可以节省很多存储空间这样在海量数据计算的场景下可以减少数据传输的大小极大的提高计算效率。 InputFormat分析 Hadoop中有一个抽象类是InputFormatInputFormat抽象类是MapReduce输入数据的顶层基类这个抽象类中只定义了两个方法 一个是getSplits方法另一个是createRecordReader方法 这个抽象类下面有三个子继承类 DBInputFormat是操作数据库的FileInputFormat是操作文件类型数据的DelegatingInputFormat是用在处理多个输入时使用的 这里面比较常见的也就是FileInputFormat了FileInputFormat是所有以文件作为数据源的基类 FileInputFormat保存job输入的所有文件并实现了对输入文件计算splits的方法至于获得文件中数据的方法是由子类实现的。 FileInputFormat下面还有一些子类 CombineFileInputFormat处理小文件问题的后面我们再详细分析TextInputFormat是默认的处理类处理普通文本文件他会把文件中每一行作为一个记录将每一行的起始偏移量作为key每一行的内容作为value这里的key和value就是我们之前所说的k1,v1它默认以换行符或回车键作为一行记录NLineInputFormat可以动态指定一次读取多少行数据 这里面的TextInputFormat是我们处理文本数据的默认处理类TextInputFormat的顶层基类是InputFormat下面我们先来看一下这个抽象类的源码 面试题 看几个面试题 一个1G的文件会产生多少个map任务 Block块默认是128M所以1G的文件会产生8个Block块 默认情况下InputSplit的大小和Block块的大小一致每一个InputSplit会产生一个map任务 所以1024/1288个map任务1000个文件每个文件100KB会产生多少个map任务 一个文件不管再小都会占用一个block所以这1000个小文件会产生1000个Block 那最终会产生1000个InputSplit也就对应着会产生1000个map任务一个140M的文件会产生多少个map任务 根据前面的分析 140M的文件会产生2个Block那对应的就会产生2个InputSplit了 注意这个有点特殊140M/128M1.093751.1 所以这个文件只会产生一个InputSplit也最终也就只会产生1个map 任务。 这个文件其实再稍微大1M就可以产生2个map 任务了。 OutputFormat分析 OutputFormat顾名思义这个是控制MapReduce输出的。 OutputFormat是输出数据的顶层基类FileOutputFormat文件数据处理基类TextOutputFormat默认文本文件处理类 这几个其实和InputFormat中的那几个文本处理类是对应着的当然了针对输出数据还有其它类型的处理类我们在这先分析最常见的文本文件处理类其他类型的等我们遇到具体场景再具体分析。 我们来看一下OutputFormat的源码这个类主要由三个方法 getRecordWriter checkOutputSpecs getOutputCommitter先看一下getRecordWriter这个方法的具体实现在TextOutputFormat中 public RecordWriterK, V getRecordWriter(TaskAttemptContext job ) throws IOException, InterruptedException {Configuration conf job.getConfiguration();//任务的输出数据是否需要压缩默认为false也就是不压缩boolean isCompressed getCompressOutput(job);//获取输出的key(k3)和value(v3)之间的分隔符,默认为\tString keyValueSeparator conf.get(SEPARATOR, \t);CompressionCodec codec null;String extension ;if (isCompressed) {Class? extends CompressionCodec codecClass getOutputCompressorClass(job, GzipCodec.class);codec ReflectionUtils.newInstance(codecClass, conf);extension codec.getDefaultExtension(); }//获取输出文件路径信息Path file getDefaultWorkFile(job, extension);FileSystem fs file.getFileSystem(conf);//获取文件输出流FSDataOutputStream fileOut fs.create(file, false);if (isCompressed) {return new LineRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);} else {//创建行书写器return new LineRecordWriter(fileOut, keyValueSeparator);} }
http://www.zqtcl.cn/news/528486/

相关文章:

  • 石家庄网站seo服务免费10大看盘软件
  • 自己做网站卖什么给个网站好人有好报2020免费
  • 网站源码安装步骤网站开发用c 语言
  • 网站首页是什么产品网络推广方案
  • 网站首页制作方案南通市规划建设局网站
  • 网站建设费用兴田德润团队西宁网站策划公司
  • 手机价格网站建设用别人备案域名做违法网站
  • 成都武侯区建设厅官方网站石家庄住房和城乡建设部网站
  • 前端做网站的步骤酉阳网站建设
  • 湖北省住房与建设厅网站php做网站访问记录
  • 做网站的公司没有技术吉林北京网站建设
  • 产品设计培训机构哪家好贵州整站优化seo平台
  • 天津网站制作推广wordpress 果酱
  • 写给初学网站开发们的一封信企业网站建设 ppt
  • 做装修网站多少钱做网站百度一下
  • 用asp做网站的可行性分析9免费建网站
  • 网站域名注册商查询徐州集团网站建设报价
  • 句容网站设计公司做网站充值犯法吗
  • 网站建设所用系统网站备案目的
  • 苏州做网站优化公司哪家好网站的大小
  • 四川省住房和城乡建设厅官方网站网站建设图标图片
  • 做影视网站侵权吗评论凡科网站建设怎么样
  • 建设个人网站流程建设游戏网站需要哪些设备
  • 四字母net做网站怎么样河南做网站优化
  • 怎样做网站快照网站当前位置怎么做
  • 网站模板移植现在c 做网站用什么框架
  • 国内专业的室内设计网站盐城网站开发代理商
  • 外贸网站建设 评价wordpress 函数调用
  • 广告支持模式的网站二级域名做网站域名
  • 空间 两个网站购物网站建设图标大全