网站的seo如何优化,炎陵做网站,公司网站 个人备案,新闻头条免费下载安装实训笔记7.25 7.25笔记一、MapReduce的特殊使用场景1.1 通过MapReduce程序实现多文件Join操作1.1.1 通过在Reduce端实现join操作1.1.2 通过在Map端实现join操作 1.2 MapReduce中的计数器的使用1.2.1 计数器使用两种方式 1.3 MapReduce实现数据清洗 二、MapReduce的OutputFormat… 实训笔记7.25 7.25笔记一、MapReduce的特殊使用场景1.1 通过MapReduce程序实现多文件Join操作1.1.1 通过在Reduce端实现join操作1.1.2 通过在Map端实现join操作 1.2 MapReduce中的计数器的使用1.2.1 计数器使用两种方式 1.3 MapReduce实现数据清洗 二、MapReduce的OutputFormat机制三、MapReduce整体流程涉及到一些核心组件四、MapReduce的调优相关知识点4.1 针对磁盘IO问题MR程序出现了一种压缩和解压缩机制可以解决MR程序运行中涉及到大量磁盘IO的问题4.1.1 常用的压缩算法的适用场景4.1.2 MapReduce程序可以压缩数据的位置4.1.3 在MapReduce中开启压缩机制 五、Hadoop的第三大组成--YARN框架5.1 YARN的基本架构组成5.1.1 ResourceManagerYARN集群的管理者5.1.2 NodeManager5.1.3 Container5.1.4 ApplicationMaster 5.2 YARN的详细工作流程--运行MapReduce 六、YARN的资源调度器问题6.1 YARN中一共三种的资源调度器6.1.1 FIFIO资源调度器6.1.2 容量调度器6.1.3 公平调度器6.1.4 修改yarn-site.xml 6.2 默认使用的容量调度器容量调度器可以有多个队列每一个队列占用集群的部分资源默认情况下容量调度只有一个队列default,队列占有集群的所有资源如果配置容量调度器的第二个队列capacity-scheduler.xml 七、YARN的web网站问题7.1 存在问题7.2 解决上述问题的方案 代码示例 7.25笔记
一、MapReduce的特殊使用场景
1.1 通过MapReduce程序实现多文件Join操作
1.1.1 通过在Reduce端实现join操作
核心思路是将多个文件读取之后以多文件的关联字段为key剩余为value发送给reducereduce通过关联字段将value聚合随后进行join操作
Reduce端join非常容易出现数据倾斜问题
1.1.2 通过在Map端实现join操作
核心思路将多文件中的这些小文件几十兆或者几十KB左右在驱动程序中把数据文件缓存起来只对大文件数据进行切片处理在map处理数据时现在setup方法中对缓存的小文件进行读取缓存缓存的时候以关联字段为key进行缓存
1.2 MapReduce中的计数器的使用
计数器在MapReduce中是用来统计分布式计算程序中一些感兴趣数据的一些数值。计数器MR程序运行中已经给我们提供了很多的计数器如果我们觉得这些计数中没有我们所需要的数据的数值我们可以自定义计数器去使用。
1.2.1 计数器使用两种方式 使用普通的字符串 context,getCounter(String groupName,String counterName).increment(num); 使用枚举类 context.getCounter(Enum的对象).increment(num)
1.3 MapReduce实现数据清洗
数据清洗就是我们把原始数据中一些不合法非法不感兴趣的数据清洗处理掉
因此数据清洗一般只需要map阶段即可在map阶段只需要对合法的数据进行context,write操作不合法的数据直接舍弃
二、MapReduce的OutputFormat机制
TextOutputFormat输出的是纯文本文档数据key-value之间以\t分割的一个kv使用占用一行
SequenceFileOutputFormat输出是一个SequenceFile文件格式的数据SequenceFile文件特殊在文件是一个普通的文件但是文件中的数据是二进制的并且可以被压缩的数据 文件没有被压缩只是数据被压缩了数据压缩还有三种模式none、record、block 默认情况下一个reduceTask输出一个文件文件名固定的part-r/m-xxxxx 自定义OutputFormat实现相关数据的写出
三、MapReduce整体流程涉及到一些核心组件
InputFormat组件 切片机制读取kv机制 Mapper组件处理一个切片的数据Partitioner组件map阶段的输出的数据计算分区使用WritableComparable组件进行输出数据排序三次排序Combiner组件可选组件进行map端输出数据的局部合并Reduce组件处理一个分区的数据聚合处理OutputFormat组件输出最终的结果数据
四、MapReduce的调优相关知识点
MapReduce运行中可能会产生很多影响MR计算效率的一些问题数据倾斜问题、大量的磁盘IO、小文件过多…
4.1 针对磁盘IO问题MR程序出现了一种压缩和解压缩机制可以解决MR程序运行中涉及到大量磁盘IO的问题
压缩和解压缩是MR程序提供的一种在Map输出或者reduce输出或者map输入之前可以通过指定的压缩算法对文件或者中间数据进行压缩这样的话可以减少磁盘IO的数据量如果我们在map的中间输出指定了压缩那么reduce拉取会数据之后会根据指定的压缩机制对压缩的数据进行解压缩。
压缩机制确实可以提升我们MR程序的运行效率但是也是有成本的压缩因为使用专门的算法算法越复杂压缩的时候程序的CPU的负载越大。
压缩适用于IO密集的MR程序计算密集的MR程序不适用
4.1.1 常用的压缩算法的适用场景 gzip 压缩的文件无法被MapReduce切片压缩效率和压缩速度都相对而言比较快如果一个文件压缩之后在128兆左右的话可以适用这个压缩机制 bzip2 压缩的文件支持切片的压缩效率很高但是压缩速度非常慢如果我们MR程序对时间要求不高但是数据量非常庞大的情况下 lzo 压缩的文件支持切片但是如果要支持切片是非常复杂的MR程序支持适用lzo算法但是MR程序没有自带这个算法 压缩效率不高胜在速度非常快 使用比较麻烦的因为Hadoop没有自带这个算法使用的话得需要下载插件引入依赖… snappy 压缩文件不支持切片压缩速度非常快是所有压缩算法中最快的了压缩的效率比gzip低
4.1.2 MapReduce程序可以压缩数据的位置
Map的输入采用一些支持切片的压缩机制bzip2、lzogzip和snappy也可以用只不过最好保证数据压缩之后在128兆左右map的输出snappy机制reduce的输出最好也是支持切片的压缩机制
4.1.3 在MapReduce中开启压缩机制
在MR中使用压缩机制不需要我们去进行手动的压缩和解压缩只需要在MR的合适的位置指定我们使用的是何种压缩机制MR程序会自动的调用设置的压缩和解压缩算法进行自动化操作。 mapper的输入开启压缩 只需要在Configuration或者core-site.xml文件增加如下一行配置即可 配置名io.compression.codecs
配置值org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.Lz4Codec,org.apache.hadoop.io.compress.SnappyCodec
只需要把上述配置配置好MR程序在处理输入文件时如果输入文件是上述配置的压缩的后缀mapper的输出可以开启压缩 mapreduce.map.output.compress true/false
mapreduce.map.output.compress.codec org.apache.hadoop.io.compress.GzipCodecreduce的输出可以开启压缩 FileOutputFormat.setCompressOutput(job,true);//是否开启输出压缩 FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);//reduce输出压缩使用的压缩机制.可以使用如下命令检查Hadoop集群目前本身不需要安装插件就支持的压缩算法
hadoop checknative
五、Hadoop的第三大组成–YARN框架
YARN是一个分布式资源调度系统专门用来给分布式计算程序提供计算资源的而且YARN只负责进行资源的提供不管计算程序的逻辑因此YARN这个软件非常的成功因为YARN不关注程序计算逻辑因此只要是分布式计算程序只要满足YARN的运行要求那么就可以在YARN上进行运行由YARN进行资源调度。spark、flink等等分布式计算程序都可以在YARN上运行。
5.1 YARN的基本架构组成
YARN之所以提供分布式计算资源主要原因就是因为YARN的设计架构
5.1.1 ResourceManagerYARN集群的管理者
1、负责进行资源的配置
2、负责整个集群的状态
3、接受客户端或者applicationmaster的资源申请
5.1.2 NodeManager
1、负责接受RM给NM分配的task任务就是资源的打包任务
2、负责启动Container容器打包的计算程序所需的运行资源 5.1.1~5.1.2YARN启动之后就会有的进程 5.1.3 Container
封装了一组计算资源的容器包含了计算程序所需的资源资源的具体的配额都是客户端或者ApplicationMaster去向RM申请
5.1.4 ApplicationMaster
任何一个分布式计算程序如果想在YARN上运行分布式计算程序必须能启动一个ApplicationMaster进程比如MR程序在YARN上运行就会启动MRAppcationMaster。这个进程不是由YARN自带的而是分布式计算程序想在YARN上运行分布式计算程序必须得有这么一个进程。
YARN的工作核心YARN之所以不知道分布式计算程序的计算逻辑还能给分布式计算程序提供资源全凭借ApplicationMaster的存在ApplicationMaster是分布式程序运行的核心监控分布式计算程序有没有运行成功、负责向RM申请分布式程序运行的资源。 5.1.3~5.1.4当有分布式计算程序在YARN上运行的时候才会出现这两个进程 5.2 YARN的详细工作流程–运行MapReduce 六、YARN的资源调度器问题
YARN在进行资源分配的时候RM需要先将client或者AM申请的资源初始化成为一个task任务资源的task任务不是直接下发给NM而是先把task任务给加入到一个RM的调度器当中由调度器在合适的时机下发任务给NM。
6.1 YARN中一共三种的资源调度器
6.1.1 FIFIO资源调度器
是一种队列调度器每一个任务加入到调度器中按照时间的先后依次排列给NM下发任务的时候是先来的先分配后来等待集群资源充足继续分配。 只有一个队列队列使用的集群中所有的资源
特点 如果有些任务比较重要必须排队只有得到队列中你排到了最前面了才会给你分配
Hadoop1.x版本YARN默认的调度器机制
6.1.2 容量调度器
也是一个队列调度器但是多个队列并行进行分配每一个队列具备YARN集群中的部分资源。在同一个时刻可以下发多个任务
Hadoop2.x和hadoop3.x默认调度器
6.1.3 公平调度器
也是可以具备多个队列每个队列具备集群中的部分资源不一样的地方在于每一个队列中的任务不等待每一个任务都会启动均匀的享有集群的资源。
6.1.4 修改yarn-site.xml
yarn.resourcemanager.scheduler.class
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler
6.2 默认使用的容量调度器容量调度器可以有多个队列每一个队列占用集群的部分资源默认情况下容量调度只有一个队列default,队列占有集群的所有资源如果配置容量调度器的第二个队列capacity-scheduler.xml
property nameyarn.scheduler.capacity.root.queues/name valuedefault,queueA/value description The queues at the this level (root is the root queue). /description
/property 容量调度器有几个队列
!-- default 队列占用的资源容量百分比 40% --
property nameyarn.scheduler.capacity.root.default.capacity/name value40/value /property !-- default 队列占用的最大资源容量百分比 60%--
property nameyarn.scheduler.capacity.root.default.maximum-capacity/name value60/value
/property如果要配置多个队列保证多个队列的capacity加起来是100每一个队列的最大占用容量要大于等于配置队列容量 七、YARN的web网站问题
YARN提供一个web网站yarn通过这个web网站可以查看YARN集群的资源信息和队列信息以及可以查看YARN上运行的分布式计算程序的状态以及运行的日志输出
7.1 存在问题
YARN记录的分布式运行程序只是本次开启有效如果YARN关闭重启了那么以前在YARN上运行的日志全部消失了YARN记录的分布式运行程序在网站上看不到详细的日志信息因此后期维护或者查看MR运行信息就很麻烦了
7.2 解决上述问题的方案
第一步配置MapReduce的历史服务器JobHistory可以帮助YARN记忆以前开启的时候运行的MR程序 历史服务器的配置主要在mapred-site.xml文件中配置主要配置两项
property namemapreduce.jobhistory.address/name valuesingle:10020/value /property property namemapreduce.jobhistory.webapp.address/name valuesingle:19888/value
/property如果使用历史服务器必须启动历史服务器如果不启动历史服务器不会记录YARN上运行的分布式计算程序 mr-jobhistory-daemon.sh start historyserver 第二步配置YARN聚合MapReduce运行日志信息–可以在YARN的web界面查看MR的详细日志 配置yarn-site.xml文件
!-- 日志聚集功能启动 --
property nameyarn.log-aggregation-enable/name valuetrue/value /property
!-- 日志保留时间设置7天 --
property nameyarn.log-aggregation.retain-seconds/name value604800/value
/property
property nameyarn.log.server.url/name valuehttp://single:19888/jobhistory/logs/value
/property代码示例
package com.sxuek.wordcount;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;public class WCDriver {public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {//1、准备一个配置文件对象Configuration configuration new Configuration();configuration.set(fs.defaultFS,hdfs://192.168.68.101:9000);//2、创建一个封装MR程序使用Job对象Job job Job.getInstance(configuration);job.setJarByClass(WCDriver.class);//指定输入文件路径 输入路径默认是本地的如果你想要是HDFS上的 那么必须配置fs.defaultFS 指定HDFS的路径FileInputFormat.setInputPaths(job,new Path(/wordcount.txt));/*** 4、封装Mapper阶段*/job.setMapperClass(WCMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);/*** 6、封装Reducer阶段*/job.setReducerClass(WCReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);job.setNumReduceTasks(1);/*** 7、封装指定的OutputFormat,如果没有指定OutputFormat 默认使用TextOutputFormat*/Path path new Path(/output);FileSystem fs FileSystem.get(new URI(hdfs://192.168.68.101:9000), configuration, root);if (fs.exists(path)){fs.delete(path,true);}job.setOutputFormatClass(WCOutputFormat.class);FileOutputFormat.setOutputPath(job,path);/*** 8、提交程序运行* 提交的时候先进行切片规划然后将配置和代码提交给资源调度器*/boolean b job.waitForCompletion(true);System.exit(b?0:1);}
}class WCMapper extends MapperLongWritable, Text,Text,LongWritable{Overrideprotected void map(LongWritable key, Text value, MapperLongWritable, Text, Text, LongWritable.Context context) throws IOException, InterruptedException {String line value.toString();String[] words line.split( );for (String word : words) {context.write(new Text(word),new LongWritable(1L));}}
}class WCReducer extends ReducerText,LongWritable,Text,LongWritable{Overrideprotected void reduce(Text key, IterableLongWritable values, ReducerText, LongWritable, Text, LongWritable.Context context) throws IOException, InterruptedException {long sum 0l;for (LongWritable value : values) {sum value.get();}context.write(key,new LongWritable(sum));}
}class WCOutputFormat extends FileOutputFormatText,LongWritable{Overridepublic RecordWriterText, LongWritable getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {return new WCRecordWriter();}
}class WCRecordWriter extends RecordWriterText,LongWritable{private Connection connection;private PreparedStatement preparedStatement;public WCRecordWriter(){/*** 在无参构造器中先连接上MySQL*/try {Class.forName(com.mysql.cj.jdbc.Driver);connection DriverManager.getConnection(jdbc:mysql://localhost:3306/mr?serverTimezoneUTCuseUnicodetruecharacterEncodingUTF-8,root,root);String sql insert into wordcount(word,count) values(?,?);preparedStatement connection.prepareStatement(sql);} catch (ClassNotFoundException e) {throw new RuntimeException(e);} catch (SQLException e) {throw new RuntimeException(e);}}Overridepublic void write(Text key, LongWritable value) throws IOException, InterruptedException {String word key.toString();Long count value.get();try {preparedStatement.setString(1,word);preparedStatement.setInt(2,count.intValue());preparedStatement.executeUpdate();} catch (SQLException e) {throw new RuntimeException(e);}}Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {if (preparedStatement ! null){try {preparedStatement.close();} catch (SQLException e) {throw new RuntimeException(e);}}if (connection ! null){try {connection.close();} catch (SQLException e) {throw new RuntimeException(e);}}}
}package com.sxuek.compress;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.ReflectionUtils;import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;/*** 压缩和解压缩其实就是使用IO流的形式对数据读取和写出* Hadoop的default压缩算法的使用* 165s* 96%*/
public class Demo01 {public static void main(String[] args) throws IOException {DefaultCodec defaultCodec ReflectionUtils.newInstance(DefaultCodec.class, new Configuration());FileOutputStream fos new FileOutputStream(f://CentOS-7-x86_64-DVD-1708.isodefaultCodec.getDefaultExtension());CompressionOutputStream outputStream defaultCodec.createOutputStream(fos);FileInputStream fis new FileInputStream(f://CentOS-7-x86_64-DVD-1708.iso);long time System.currentTimeMillis();IOUtils.copyBytes(fis,outputStream,1*1024*1024);long time1 System.currentTimeMillis();System.out.println(time1-time);}
}
package com.sxuek.compress;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.ReflectionUtils;import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;/*** gzip* 179s* 96%*/
public class Demo02 {public static void main(String[] args) throws IOException {GzipCodec gzipCodec ReflectionUtils.newInstance(GzipCodec.class, new Configuration());FileOutputStream fos new FileOutputStream(f://CentOS-7-x86_64-DVD-1708.isogzipCodec.getDefaultExtension());CompressionOutputStream outputStream gzipCodec.createOutputStream(fos);FileInputStream fis new FileInputStream(f://CentOS-7-x86_64-DVD-1708.iso);long time System.currentTimeMillis();IOUtils.copyBytes(fis,outputStream,1*1024*1024);long time1 System.currentTimeMillis();System.out.println(time1-time);}
}
package com.sxuek.compress;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.ReflectionUtils;import java.io.FileInputStream;
import java.io.FileOutputStream;/*** bzip*/
public class Demo03 {public static void main(String[] args) throws Exception {BZip2Codec bZip2Codec ReflectionUtils.newInstance(BZip2Codec.class, new Configuration());FileOutputStream fos new FileOutputStream(f://CentOS-7-x86_64-DVD-1708.isobZip2Codec.getDefaultExtension());CompressionOutputStream outputStream bZip2Codec.createOutputStream(fos);FileInputStream fis new FileInputStream(f://CentOS-7-x86_64-DVD-1708.iso);long time System.currentTimeMillis();IOUtils.copyBytes(fis,outputStream,1*1024*1024);long time1 System.currentTimeMillis();System.out.println(time1-time);}
}
package com.sxuek.compress;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.util.ReflectionUtils;import java.io.FileInputStream;
import java.io.FileOutputStream;/*** snappy*/
public class Demo04 {public static void main(String[] args) throws Exception{SnappyCodec snappyCodec ReflectionUtils.newInstance(SnappyCodec.class, new Configuration());FileOutputStream fos new FileOutputStream(f://CentOS-7-x86_64-DVD-1708.isosnappyCodec.getDefaultExtension());CompressionOutputStream outputStream snappyCodec.createOutputStream(fos);FileInputStream fis new FileInputStream(f://CentOS-7-x86_64-DVD-1708.iso);long time System.currentTimeMillis();IOUtils.copyBytes(fis,outputStream,1*1024*1024);long time1 System.currentTimeMillis();System.out.println(time1-time);}
}