当前位置: 首页 > news >正文

调颜色网站宣传片拍摄服务

调颜色网站,宣传片拍摄服务,wordpress主题修改ftp,品牌设计vi设计公司目录 1 MapReduce 的数据流1.1 数据流走向1.2 InputFormat 数据输入1.2.1 FileInputFormat 切片源码、机制1.2.2 TextInputFormat 读数据源码、机制1.2.3 CombineTextInputFormat 切片机制 1.3 OutputFormat 数据输出1.3.1 OutputFormat 实现类1.3.2 自定义 OutputFormat 2 Map… 目录 1 MapReduce 的数据流1.1 数据流走向1.2 InputFormat 数据输入1.2.1 FileInputFormat 切片源码、机制1.2.2 TextInputFormat 读数据源码、机制1.2.3 CombineTextInputFormat 切片机制 1.3 OutputFormat 数据输出1.3.1 OutputFormat 实现类1.3.2 自定义 OutputFormat 2 MapReduce 框架原理2.1 MapTask 工作机制2.2 ReduceTask 工作机制2.3 MapTask 并行度决定机制2.4 ReduceTask 并行度决定机制2.5 Shuffle 机制2.5.1 Shuffle 机制流程2.5.2 Paratition 分区2.5.3 WritableComparable 排序2.5.4 Combiner 合并 2.6 MapReduce 工作流程 3 Join 应用3.1 Reduce Join3.2 Map Join 1 MapReduce 的数据流 1.1 数据流走向 查看 MapTask 源码中的 run()方法 查看 ReduceTask 源码中的 run() 1.2 InputFormat 数据输入 InputFormat的体系结构 FileInputFormatInputFormat 的子实现类重写了抽象方法 getSplits()实现切片逻辑 TextInputFormat FileInputFormat 的子实现类重写了抽象方法 createRecordReader() 实现读取数据的逻辑 CombineFileInputFormatFileInputFormat 的子实现类此类中也实现了一套切片逻辑 适用于小文件计算场景 1.2.1 FileInputFormat 切片源码、机制 FileInputFormat.java 是InputFormat.java的实现类该文件中中重写了抽象方法 getSplits()实现切片逻辑 看源码时通过ctrl点击进入某些方法的具体实现有时源码位置的跳跃性很大 IDEA中可以CTRLALT然后通过键盘中的左右箭头实现源码中定位的来回切换 public ListInputSplit getSplits(JobContext job) throws IOException {// 计时器开始StopWatch sw new StopWatch().start();// getFormatMinSplitSize()返回1// getMinSplitSize(job)配置文件中的配置默认配置为0可以通过修改mapreduce.input.fileinputformat.split.minsize配置项来改变minSize的大小// 故minSize默认返回为1long minSize Math.max(getFormatMinSplitSize(), getMinSplitSize(job));// hadoop中默认没有配置mapreduce.input.fileinputformat.split.maxsize// 故maxSize默认为Long类型的最大值long maxSize getMaxSplitSize(job);// 管理最终切完片的对象的集合最终返回的就是此集合ListInputSplit splits new ArrayListInputSplit();// 获取当前文件的详情ListFileStatus files listStatus(job);boolean ignoreDirs !getInputDirRecursive(job) job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);//遍历获取到的文件列表依次以文件为单位进行切片for (FileStatus file: files) {// 如果是忽略文件以及是文件夹就不进行切片if (ignoreDirs file.isDirectory()) {continue;}// 获取文件的当前路径Path path file.getPath();// 获取文件的大小long length file.getLen();// 如果不是空文件if (length ! 0) {// 获取文件的具体的块信息BlockLocation[] blkLocations;if (file instanceof LocatedFileStatus) {blkLocations ((LocatedFileStatus) file).getBlockLocations();} else {FileSystem fs path.getFileSystem(job.getConfiguration());blkLocations fs.getFileBlockLocations(file, 0, length);}// 判断是否要进行切片主要判断当前文件是否是压缩文件有一些压缩文件时不能够进行切片if (isSplitable(job, path)) {// 获取hdfs中数据块的大小long blockSize file.getBlockSize();// 计算切片的大小-- 128M 默认情况下永远都是块大小long splitSize computeSplitSize(blockSize, minSize, maxSize);// 内部方法// protected long computeSplitSize(long blockSize, long minSize,// long maxSize) {// return Math.max(minSize, Math.min(maxSize, blockSize));// }// 判断当前的文件的剩余内容是否要继续切片 SPLIT_SLOP 1.1// 判断公式bytesRemaining)/splitSize SPLIT_SLOP// 用文件的剩余大小/切片大小 1.1 才继续切片这样做的目的是为了让我们每一个MapTask处理的数据更加均衡long bytesRemaining length;while (((double) bytesRemaining)/splitSize SPLIT_SLOP) {int blkIndex getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));bytesRemaining - splitSize;}// 如果最后文件还有剩余且不足一个切片大小最后再形成最后的一个切片if (bytesRemaining ! 0) {int blkIndex getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));}} else { // not splitableif (LOG.isDebugEnabled()) {// Log only if the file is big enough to be splittedif (length Math.min(file.getBlockSize(), minSize)) {LOG.debug(File is not splittable so no parallelization is possible: file.getPath());}}splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),blkLocations[0].getCachedHosts()));}} else { //Create empty hosts array for zero length filessplits.add(makeSplit(path, 0, length, new String[0]));}}// Save the number of input files for metrics/loadgenjob.getConfiguration().setLong(NUM_INPUT_FILES, files.size());sw.stop();if (LOG.isDebugEnabled()) {LOG.debug(Total # of splits generated by getSplits: splits.size() , TimeTaken: sw.now(TimeUnit.MILLISECONDS));}return splits;}Math.max(minSize, Math.min(maxSize, blockSize)); maxsize切片最大值参数如果比blockSize小则会让切片变小而且就等于这个配置的这个参数的值minsize切片最小值参数如果比blockSize大则可以让切片的大小比blockSize还要大 1.2.2 TextInputFormat 读数据源码、机制 TextInputFormat.java 是FileInputFormat.java 的实现类该文件中重写了抽象方法 createRecordReader()实现读取数据的逻辑。 按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量LongWritable类型值是这行的内容不包括任何行终止符换行符、回车符Text类型。 1.2.3 CombineTextInputFormat 切片机制 框架默认的 TextInputFormat 切片机制是对任务按文件规划切片不管文件多小都会是一个单独的切片都会交给一个 MapTask这样如果有大量小文件就会产生大量的MapTask处理效率极其低下。 CombineTextInputFormat.java 也是 FileInputFormat.java 的实现类 适用场景 用于小文件过多的场景它可以将多个小文件从逻辑上规划到一个切片中这样多个小文件就可以交给一个 MapTask 处理。 虚拟存储切片最大值设置 CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4M虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。 切片机制 生成切片过程包括虚拟存储过程和切片过程两部分 1虚拟存储过程 将输入目录下所有文件大小依次和设置的 setMaxInputSplitSize 值比较如果不大于设置的最大值逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍那么以最大值切割一块当剩余数据大小超过设置的最大值且不大于最大值2倍此时将文件均分成2个虚拟存储块防止出现太小切片。 例如 setMaxInputSplitSize 值为4M输入文件大小为 8.02M则先逻辑上分成一个 4M。剩余的大小为4.02M如果按照4M逻辑划分就会出现 0.02M 的小的虚拟存储文件所以将剩余的4.02M文件切分成2.01M和2.01M两个文件。 2切片过程 a. 判断虚拟存储的文件大小是否大于 setMaxInputSplitSize 值大于 则单独形成一个切片。 b. 如果不大于则跟下一个虚拟存储文件进行合并共同形成一个切片。 c. 测试举例有4个小文件大小分别为 1.7M、5.1M、3.4M 以及 6.8M 这四个小文件则虚拟存储之后形成6个文件块大小分别为 1.7M2.55M、2.55M3.4M以及3.4M、3.4M 最终会形成3个切片大小分别为 1.72.55M2.553.4M3.43.4M 3案例实操 将输入的大量小文件合并成一个切片统一处理。 准备4个小文件 a. 不做任何处理运行前面的 WordCount 案例程序观察切片个数为 4 。 b. 在WordcountDriver.java中增加如下代码运行程序并观察运行的切片个数为 3。 // 如果不设置InputFormat它默认用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class);//虚拟存储切片最大值设置4M CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);c. 在 WordcountDriver.java 中增加如下代码运行程序并观察运行的切片个数为 1 // 如果不设置InputFormat它默认用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class);//虚拟存储切片最大值设置20M CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);1.3 OutputFormat 数据输出 1.3.1 OutputFormat 实现类 OutputFormat 是 MapReduce 输出的基类所有 MapReduce 输出都实现了 OutputFormat 接口以下是几种常见的OutputFormat 实现类。 TextOutputFormat 是默认的输出格式它把每条记录写为文本行它的键和值可以是任意类型因为TextOutputFormat 可以调用 toString() 方法把它们转为字符串。SequenceOutputFormat 将 SequenceOutputFormat 的输出作为后续 MapReduce的输入这便是一种好的输出格式。因为它的它的格式紧凑很容易被压缩。自定义OutputFormat 根据用户的需求自定义输出格式。 OutputFormat 是 FileOutputFormat 的父类FileOutputFormat 又是 TextOutputFormat 的父类。 1.3.2 自定义 OutputFormat 1 需求 过滤输入的 log 日志包含atguigu的网站输出到 atguigu.log不包含 atguigu 的网站输出到 other.log。 log.txt http://www.baidu.com http://www.google.com http://cn.bing.com http://www.atguigu.com http://www.sohu.com http://www.sina.com http://www.sin2a.com http://www.sin2desa.com http://www.sindsafa.com2代码编写 LogMapper.java package com.huwei.mr.outputformat;import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class LogMapper extends MapperLongWritable, Text, Text, NullWritable {Overrideprotected void map(LongWritable key, Text value, MapperLongWritable, Text, Text, NullWritable.Context context) throws IOException, InterruptedException {// 直接写出context.write(value, NullWritable.get());} }LogReducer.java package com.huwei.mr.outputformat;import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class LogReducer extends ReducerText, NullWritable, Text, NullWritable {Overrideprotected void reduce(Text key, IterableNullWritable values, ReducerText, NullWritable, Text, NullWritable.Context context) throws IOException, InterruptedException {// 遍历直接写出for (NullWritable value : values) {context.write(key, NullWritable.get());}} }自定义 LogOutputFormat.java package com.huwei.mr.outputformat;import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException; /*** 自定义的LogOutputFormat需要继承Hadoop提供的OutputFormat*/ public class LogOutputFormat extends FileOutputFormatText, NullWritable {/*** 返回一个RecordWriter对象* param job* return* throws IOException* throws InterruptedException*/Overridepublic RecordWriterText, NullWritable getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {LogRecordWriter lrw new LogRecordWriter(job);return lrw;} }自定义的 LogRecordWriter.java package com.huwei.mr.outputformat;import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;/*** 自定义的LogRecordWriter需要继承Hadoop提供的RecordWriter*/ public class LogRecordWriter extends RecordWriterText, NullWritable {// 定义输出路径private String atguiguPath E:\\hadoop\\out\\logs\\atguigu.txt;private String otherPath E:\\hadoop\\out\\logs\\other.txt;private FileSystem fs;private FSDataOutputStream atguigu;private FSDataOutputStream other;/*** 初始化工作** param job*/public LogRecordWriter(TaskAttemptContext job) throws IOException {// 获取Hadoop文件系统对象fs FileSystem.get(job.getConfiguration());// 获取输出流atguigu fs.create(new Path(atguiguPath));// 获取输出流other fs.create(new Path(otherPath));}/*** 实现数据写出的逻辑** param text* param nullWritable* throws IOException* throws InterruptedException*/Overridepublic void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {// 获取当前输入的数据String logData text.toString();if (logData.contains(atguigu)) {atguigu.writeBytes(logData \n);} else {other.writeBytes(logData \n);}}/*** 关闭资源** param taskAttemptContext* throws IOException* throws InterruptedException*/Overridepublic void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {IOUtils.closeStream(atguigu);IOUtils.closeStream(other);} } LogDriver.java package com.huwei.mr.outputformat;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class LogDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 声明配置对象Configuration conf new Configuration();// 声明Job对象Job job Job.getInstance(conf);// 指定当前Job的驱动类job.setJarByClass(LogDriver.class);// 指定当前Job的Mapperjob.setMapperClass(LogMapper.class);// 指定当前Job的Reducerjob.setReducerClass(LogReducer.class);// 指定Map端输出数据的key的类型和输出数据value的类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);// 指定最终Reduce端输出数据的key的类型和输出数据value的类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 指定自定义的 OutputFormatjob.setOutputFormatClass(LogOutputFormat.class);// 指定输入数据的路径和输出数据的路径FileInputFormat.setInputPaths(job,new Path(E:\\hadoop\\in\\log));FileOutputFormat.setOutputPath(job,new Path(E:\\hadoop\\out\\log));// 提交Job// 参数代表是否监控提交过程job.waitForCompletion(true);} } 如何实现OutputFormat自定义 自定义一个 OutputFormat 类继承Hadoop提供的OutputFormat在该类中实现 getRecordWriter() ,返回一个RecordWriter自定义一个 RecordWriter 并且继承Hadoop提供的RecordWriter类在该类中 重写 write() 和 close() 在这些方法中完成自定义输出。 2 MapReduce 框架原理 2.1 MapTask 工作机制 1Read 阶段MapTask 通过 InputFormat 获得的 RecordReader从输入InputSplit 中解析出一个个key/value。 2Map 阶段该节点主要是将解析出的key/value交给用户编写map()函数处理并产生一系列新的 key/value。 3Collect 收集阶段在用户编写 map() 函数中当数据处理完成后一般会调用 OutputCollector.collect() 输出结果。在该函数内部它会将生成的 key/value 分区调用Partitioner并写入一个环形内存缓冲区中。 4Spill 阶段即“溢写”当环形缓冲区满后MapReduce 会将数据写到本地磁盘上生成一个临时文件。需要注意的是将数据写入本地磁盘之前先要对数据进行一次本地排序并在必要时对数据进行合并、压缩等操作。 溢写阶段详情 步骤1利用快速排序算法对缓存区内的数据进行排序排序方式是先按照分区编号 Partition 进行排序然后按照 key 进行排序。这样经过排序后数据以分区为单位聚集在一起且同一分区内所有数据按照key有序。步骤2按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.outN表示当前溢写次数中。如果用户设置了Combiner则写入文件之前对每个分区中的数据进行一次聚集操作。步骤3将分区数据的元信息写到内存索引数据结构 SpillRecord 中其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB则将内存索引写到文件 output/spillN.out.index 中。 5Merge阶段当所有数据处理完成后MapTask 对所有临时文件进行一次合并以确保最终只会生成一个数据文件。 当所有数据处理完后MapTask 会将所有临时文件合并成一个大文件并保存到文件output/file.out 中同时生成相应的索引文件 output/file.out.index。 在进行文件合并过程中MapTask 以分区为单位进行合并。对于某个分区它将采用多轮递归合并的方式。每轮合并 mapreduce.task.io.sort.factor默认10个文件并将产生的文件重新加入待合并列表中对文件排序后重复以上过程直到最终得到一个大文件。 让每个 MapTask 最终只生成一个数据文件可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。 2.2 ReduceTask 工作机制 1Copy 阶段ReduceTask 从各个 MapTask 上远程拷贝一片数据并针对某一片数据如果其大小超过一定阈值则写到磁盘上否则直接放到内存中。 2Merge 阶段在远程拷贝数据的同时ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并以防止内存使用过多或磁盘上文件过多。 3Sort 阶段按照 MapReduce 语义用户编写 reduce() 函数输入数据是按 key 进行聚集的一组数据。为了将 key 相同的数据聚在一起Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序因此ReduceTask 只需对所有数据进行一次归并排序即可。 4Reduce 阶段对排序后的键值对调用 reduce() 方法键相同的键值对调用一次reduce() 方法。 5Write 阶段reduce()函数将计算结果写到 HDFS上。 2.3 MapTask 并行度决定机制 MapTask的并行度决定Map阶段的任务处理并发度进而影响到整个Job的处理速度。 数据块Block是 HDFS 物理上把数据分成一块一块。数据块是 HDFS 存储数据单位。 数据切片数据切片只是在逻辑上对输入进行分片并不会在磁盘上将其切分成片进行存储。数据切片是 MapReduce 程序计算输入数据的单位一个切片会对应启动一个MapTask。 一个Job的Map阶段并行度MapTask 由客户端在提交Job时的切片数决定一个切片就会产生一个MapTask并行处理默认情况下切片大小BlockSize128M这样设计的目的是为了避免将来切片读取数据的时候有跨机器读取数据 的情况这样效率是很低的切片时不考虑整体数据集而是逐个针对每一个文件单独切片 2.4 ReduceTask 并行度决定机制 回顾MapTask并行度由切片个数决定切片个数由输入文件和切片规则决定。 思考ReduceTask并行度由谁决定 ReduceTask 的并行度同样影响整个Job的执行并发度和执行效率但与 MapTask 的并发数由切片数决定不同ReduceTask 数量的决定是可以直接手动设置的。 // 默认值是1手动设置为4 job.setNumReduceTasks(4);1ReduceTask 0表示没有 Reduce 阶段输出文件的个数和 Map 个数一致。 2ReduceTask 的默认值就是1所以输出文件的个数为1个。 3如果数据分布不均匀就有可能在Reduce阶段产生数据倾斜。 4ReduceTask 数量不是任意设置的还要考虑业务逻辑需求在有些情况下需要计算全局汇总结果就只能有一个ReduceTask 。 5具体多少个ReduceTask 需要根据集群的性能而定。 6如果分区数不是1但是 ReduceTask 为1是否执行分区过程答案是不执行因为在 MapTask 的源码中执行分区的前提是先判断 ReduceNum 是否大于1不大于1则不执行。 2.5 Shuffle 机制 2.5.1 Shuffle 机制流程 Map 方法之后Reduce 方法之前的数据处理过程称之为 Shuffle 。 Shuffle 机制流程 图中的Map1方法不要理解为 Mapper 中重写的 map 方法把它看成一个 MapTask 的执行一个 MapTask 是会调用多个 map 方法的环形缓冲区默认大小为100M其实就是在内存中其中每一个分区内部所使用的排序算法是快速排序每个相同分区之间采用的是归并排序在磁盘上进行当环形缓冲区的数据量达到自身容量的 80%会发生第一次溢写 1MapTask收集我们的map()方法输出的 kv 对放到内存缓冲区中 2从内存缓冲区不断溢出本地磁盘文件可能会溢出多个文件 3多个溢出文件会被合并成大的溢出文件 4在溢出过程及合并的过程中都要调用Partitioner进行分区和针对key进行排序 5ReduceTask根据自己的分区号去各个MapTask机器上取相应的结果分区数据 6ReduceTask会抓取到同一个分区的来自不同MapTask的结果文件ReduceTask会将这些文件再进行合并归并排序 7合并成大文件后Shuffle的过程也就结束了后面进入ReduceTask的逻辑运算过程从文件中取出一个一个的键值对Group调用用户自定义的reduce()方法 注意 Shuffle中的缓冲区大小会影响到 MapReduce 程序的执行效率原则上说缓冲区越大则可以容纳更多的数据并减少写入磁盘的次数磁盘IO的次数越少执行速度就越快。缓冲区的大小可以通过参数调整参数mapreduce.task.io.sort.mb 默认100M。 2.5.2 Paratition 分区 要求将统计结果按照条件输出到不同的文件分区中。比如将统计结果按照手机归属地不同省份输出到不同的文件中分区。 1Hadoop默认的分区规则源码解析 定位 MapTask 的 map 方法中 context.write(outk, outv);跟到 write(outk, outv) 中进入到 ChainMapContextImpl 类的实现中 public void write(KEYOUT key, VALUEOUT value) throws IOException,InterruptedException {output.write(key, value);}跟到 output.write(key, value) 内部实现类 NewOutputCollector public void write(K key, V value) throws IOException, InterruptedException {collector.collect(key, value,partitioner.getPartition(key, value, partitions)); }重点理解 partitioner.getPartition(key, value, partitions); 跟进默认的分区规则实现类 HashPartitioner public int getPartition(K key, V value,int numReduceTasks) { // 根据当前的key的hashCode值和ReduceTask的数量进行取余操作// 获取到的值就是当前kv所属的分区编号。return (key.hashCode() Integer.MAX_VALUE) % numReduceTasks; }Partitioner是 Hadoop 的分区器对象负责给 Map 阶段输出数据选择分区的功能。 默认分区是根据 key 的 hashCode 对 ReduceTask 的个数取模得到的数字编号这个分区编号在Job提交的时候就已经定义好了。用户没法控制哪个 key 存储到哪个分区。 2自定义分区规则 将统计结果按照手机归属地不同省份输出到不同文件分区中。 使用在 大数据技术学习笔记五—— MapReduce12.3小节案例的数据 phone_data.txt手机号136、137、138、139开头都分别放到一个独立的4个文件中其他开头的放到一个文件中。 在案例 2.3 的基础上增加一个分区类 //自定义一个分区器对象需要继承Hadoop提供的Partitioner对象 //这里的泛型就是Mapper输出的泛型 public class FlowPartitioner extends PartitionerText, FlowBean {/*** 定义当前kv所属分区的规则** param text the key to be partioned.* param flowBean the entry value.* param numPartitions the total number of partitions.* 分区* 136 —— 0* 137 —— 1* 138 —— 2* 139 —— 3* 其他 —— 4*/Overridepublic int getPartition(Text text, FlowBean flowBean, int numPartitions) {int phonePartition;// 获取手机号String phoneNum text.toString();if(phoneNum.startsWith(136)){phonePartition0;} else if (phoneNum.startsWith(137)) {phonePartition1;}else if (phoneNum.startsWith(138)) {phonePartition2;}else if (phoneNum.startsWith(139)) {phonePartition3;}else {phonePartition4;}return phonePartition;} }在驱动函数中增加自定义数据分区设置和 ReduceTask 设置 // 指定ReduceTask的数量 job.setNumReduceTasks(5); // 指定自定义的分区器对象实现 job.setPartitionerClass(FlowPartitioner.class);分区器使用时注意事项 当 ReduceTask 的数量设置的分区数 getPartition的结果数实际用到的分区数 此时会生成空的分区文件当 ReduceTask 的数量设置的分区数 getPartition的结果数实际用到的分区数 导致有一部分数据无处安放此时会报错当 ReduceTask 的数量设置的分区数 1 则不管 MapTask 端输出多少个分区文件最终结果文件会输出到一个文件part-r-00000中分区编号生成的规则根据指定的ReduceTask的数量 从 0 开始依次累加。 2.5.3 WritableComparable 排序 排序是MR最重要的操作之一。 MapTask 和 ReduceTask 均会对数据按照 key 进行排序。该操作属于 Hadoop 的默认行为。任何应用程序中的数据均会被排序而不管逻辑上是否需要。 默认排序是按照字典顺序排序且实现该排序的方法是快速排序。 对于 MapTask它会将处理的结果暂时存放到环形缓冲区中当环形缓冲区使用率达到一定的阈值后再对缓冲区中的数据进行一次快速排序并将这些有序数据溢写到磁盘上而当数据处理完毕后它会对磁盘上所有的文件进行归并排序。 对于ReduceTask它从每个 MapTask上远程拷贝相应的数据文件如果文件大小超过一定阈值则溢写到磁盘上否则存储在内存中。如果磁盘上文件数目达到一定的阈值则进行一次归并排序以生成一个更大的文件如果内存中文件大小或文件数目超过一定的阈值则进行一次合并后将数据溢写到磁盘上。当所有的数据拷贝完成后ReduceTask统一对内存和磁盘上所有的数据进行一次归并排序。 排序分类 部分排序MapReduce根据输入记录的键对数据集排序。保证每个输出文件内部有序。全排序最终的输出结果只有一个文件且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低因为一台机器处理所有的文件完全丧失了 MapReduce 所提供的并行架构。二次排序在自定义排序过程中如果 compareTo 中的判断条件为两个即为二次排序。 这里仍然使用在 大数据技术学习笔记五—— MapReduce12.3小节案例 代码编写 FlowBean.java package com.huwei.mr.sort;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;public class FlowBean implements WritableComparableFlowBean {private Integer upFlow;private Integer downFlow;private Integer sumFlow;// 默认有无参构造方法public Integer getUpFlow() {return upFlow;}public void setUpFlow(Integer upFlow) {this.upFlow upFlow;}public Integer getDownFlow() {return downFlow;}public void setDownFlow(Integer downFlow) {this.downFlow downFlow;}public Integer getSumFlow() {return sumFlow;}public void setSumFlow(Integer sumFlow) {this.sumFlow sumFlow;}Overridepublic String toString() {return FlowBean{ upFlow upFlow , downFlow downFlow , sumFlow sumFlow };}/*** 序列化方法*/Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeInt(upFlow);dataOutput.writeInt(downFlow);dataOutput.writeInt(sumFlow);}/*** 反序列化方法* 顺序要和序列化方法一致*/Overridepublic void readFields(DataInput dataInput) throws IOException {upFlow dataInput.readInt();downFlow dataInput.readInt();sumFlow dataInput.readInt();}// 计算上下行流量之和public void setSumFlow() {this.sumFlow this.upFlow this.downFlow;}/*** 自定义排序规则* 需求根据总流量倒序* param o the object to be compared.* return*/Overridepublic int compareTo(FlowBean o) {//按照总流量比较,倒序排列if(this.sumFlow o.sumFlow){return -1;}else if(this.sumFlow o.sumFlow){return 1;}else {return 0;}// return -this.getSumFlow().compareTo(o.getSumFlow());} }注意在public class FlowBean implements WritableComparableFlowBean中我将WritableComparable写成Writable, Comparable出现ClassCastException报错。参考MapReduce——ClassCastException报错如何解决中方法二 2.5.4 Combiner 合并 Combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件Combiner 的父类就是 Reducer 。 Combiner 和 Reducer 的区别就在于运行的位置 Combiner 是在每一个 MapTask 所在的节点运行Reducer 是接收全局所有 Mapper 的输出结果。 Combiner 的意义就是对每一个 MapTask 的输出进行局部汇总以减小网络的传输量。总的来说就是为了减轻 ReduceTask 的压力减少了IO开销提升 MR 的运行效率。 注意 Combiner 能够运用的前提是不能影响最终的业务逻辑而且 Combiner 的输出 kv 应该和 Reducer 的输入 kv 类型要对应起来。 以 WordCount 案例为例 1增加一个WordCountCombiner类继承Reducer package com.huwei.mr.combiner;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** 自定义Combiner类需要继承Hadoop提供的Reducer类* 注意Combiner流程一定发生在Map阶段*/ public class WordCountCombiner extends ReducerText, IntWritable, Text, IntWritable{private Text outk new Text();private IntWritable outv new IntWritable();Overrideprotected void reduce(Text key, IterableIntWritable values, ReducerText, IntWritable, Text, IntWritable.Context context) throws IOException, InterruptedException {int total 0;// 遍历valuesfor (IntWritable value : values) {// 对value累加输出结果total value.get();}// 封装key和valueoutk.set(key);outv.set(total);context.write(outk, outv);} }2在WordcountDriver驱动类中指定 Combiner // 指定自定义的Combiner类 job.setCombinerClass(WordCountCombiner.class);运行程序如下图所示 注意 Combiner不适用的场景Reduce端处理的数据考虑到多个MapTask的数据的整体集时就不能提前合并了。如求平均数 2.6 MapReduce 工作流程 上面的流程是整个 MapReduce 的工作流程其中从第7步开始到第16步结束为Shuffle过程。 3 Join 应用 3.1 Reduce Join 1案例需求 其中 order.txt 1001 01 1 1002 02 2 1003 03 3 1004 01 4 1005 02 5 1006 03 6pd.txt 01 小米 02 华为 03 格力2需求分析 通过将关联条件作为 Map 输出的 key将两表满足 Join 条件的数据并携带数据所来源的文件信息发往同一个 ReduceTask在 Reduce 中进行数据的串联。 3代码编写 商品和订单表合并后的对象类 Orderpd.java package com.huwei.mr.reducejoin;import org.apache.hadoop.io.Writable;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;public class Orderpd implements Writable {// order表数据private String orderId;private String pid;private Integer amount;// pd表数据private String pname;// 区分数据来源判断是order表还是pd表的标志字段private String flag;public String getOrderId() {return orderId;}public void setOrderId(String orderId) {this.orderId orderId;}public String getPid() {return pid;}public void setPid(String pid) {this.pid pid;}public Integer getAmount() {return amount;}public void setAmount(Integer amount) {this.amount amount;}public String getPname() {return pname;}public void setPname(String pname) {this.pname pname;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag flag;}Overridepublic String toString() {return Orderpd{ orderId orderId \ , pname pname \ , amount amount \ };}/*** 序列化* param dataOutput* throws IOException*/Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(orderId);dataOutput.writeUTF(pid);dataOutput.writeInt(amount);dataOutput.writeUTF(pname);dataOutput.writeUTF(flag);}/*** 反序列化* param dataInput* throws IOException*/Overridepublic void readFields(DataInput dataInput) throws IOException {orderId dataInput.readUTF();pid dataInput.readUTF();amount dataInput.readInt();pname dataInput.readUTF();flag dataInput.readUTF();} }Map端 ReduceJoinMapper.java package com.huwei.mr.reducejoin;import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;/*** Map端输出的 key是 pidvalue是 Orderpd 对象*/ public class ReduceJoinMapper extends MapperLongWritable, Text, Text, Orderpd {private Text outk new Text();private Orderpd outv new Orderpd();private FileSplit inputSplit;Overrideprotected void setup(MapperLongWritable, Text, Text, Orderpd.Context context) throws IOException, InterruptedException {// 获取切片对象inputSplit (FileSplit) context.getInputSplit();}/*** 业务处理方法 —— 将两个需要做关联的文件数据进行搜集** param key* param value* param context* throws IOException* throws InterruptedException*/Overrideprotected void map(LongWritable key, Text value, MapperLongWritable, Text, Text, Orderpd.Context context) throws IOException, InterruptedException {// 获取当前行数据String line value.toString();// 切割数据String[] datas line.split(\t);// 将当前数据封装到Textkey、Orderpdvalue中// 先判断数据来源于哪个表if (inputSplit.getPath().getName().contains(order)) {// 当前数据来源于order表// 封装输出数据的keyoutk.set(datas[1]); // pid// 封装输出数据的valueoutv.setOrderId(datas[0]); // idoutv.setPid(datas[1]); // pidoutv.setAmount(Integer.parseInt(datas[2])); // amountoutv.setPname(); // pnameorder表中没有该字段但不能不设置否则该属性为null不能被序列化会报错outv.setFlag(order);} else {// 当前数据来源于pd表// 封装输出数据的keyoutk.set(datas[0]); // pid// 封装输出数据的valueoutv.setOrderId(); // idoutv.setPid(datas[0]); // pidoutv.setAmount(0); // amountoutv.setPname(datas[1]); // pnameoutv.setFlag(pd);}// 将数据写出context.write(outk, outv);} }注意在进行序列化和反序列化操作时如果对象中存在 null 值就可能会出现报错的情况。在上述封装对象的过程中如果表中没有某个字段也不能不设置只需设置该数据类型的默认值否则该属性为null不能被序列化会报错。 Reduce 端 ReduceJoinReducer.java package com.huwei.mr.reducejoin;import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList;public class ReduceJoinReducer extends ReducerText, Orderpd, Orderpd, NullWritable {private Orderpd orderpd new Orderpd();private ArrayListOrderpd orderList new ArrayList();/*** 业务处理方法 —— 接收Map端整合好的数据进行最终的join操作** param key* param values* param context* throws IOException* throws InterruptedException*/Overrideprotected void reduce(Text key, IterableOrderpd values, ReducerText, Orderpd, Orderpd, NullWritable.Context context) throws IOException, InterruptedException {// 遍历当前相同key的一组valuesfor (Orderpd value : values) {// 判断当前数据来源// 当前数据来源于order文件if (order.equals(value.getFlag())) {Orderpd thisorder new Orderpd();// 将当前传入Orderpd对象value复制到新创建的对象thisorder中去// 参数1目标参数参数2原始参数try {BeanUtils.copyProperties(thisorder, value);orderList.add(thisorder);} catch (IllegalAccessException e) {throw new RuntimeException(e);} catch (InvocationTargetException e) {throw new RuntimeException(e);}} else {// 当前数据来源于pd文件try {BeanUtils.copyProperties(orderpd, value);} catch (IllegalAccessException e) {throw new RuntimeException(e);} catch (InvocationTargetException e) {throw new RuntimeException(e);}}}// 进行Join操作for (Orderpd op : orderList) {op.setPname(orderpd.getPname());// 将数据写出context.write(op, NullWritable.get());}// 清空 orderListorderList.clear();} }注意 在Java中在遍历每一个对象时都会在堆里新创建对象而在hadoop中由于内存资源在Hadoop中是极为珍贵的当遍历每一个对象时不会在堆中新创建对象也就是说栈中对象所有的引用都指向堆中一个对象每次遍历都会动态修改对象的值。这样会导致集合中的对象会被下一个对象覆盖。NullWritable是Writable的一个特殊类实现方法为空实现不从数据流中读数据也不写入数据只充当占位符如在MapReduce中如果你不需要使用键或值你就可以将键或值声明为NullWritable,NullWritable是一个不可变的单实例类型。 Driver 端 ReduceJoinDriver.java package com.huwei.mr.reducejoin;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class ReduceJoinDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 声明配置对象Configuration conf new Configuration();// 声明Job对象Job job Job.getInstance(conf);// 指定当前Job的驱动类job.setJarByClass(ReduceJoinDriver.class);// 指定当前Job的Mapperjob.setMapperClass(ReduceJoinMapper.class);// 指定当前Job的Reducerjob.setReducerClass(ReduceJoinReducer.class);// 指定Map端输出数据的key的类型和输出数据value的类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Orderpd.class);// 指定最终Reduce端输出数据的key的类型和输出数据value的类型job.setOutputKeyClass(Orderpd.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path(E:\\hadoop\\in\\reducejoin));FileOutputFormat.setOutputPath(job, new Path(E:\\hadoop\\out\\reducejoin));// 提交Jobjob.waitForCompletion(true);} }缺点 这种方式中合并的操作是在Reduce阶段完成Reduce端的处理压力太大Map节点的运算负载则很低资源利用率不高且在Reduce阶段极易产生数据倾斜。 解决方案Map端实现数据合并。 逻辑处理接口Mapper 用户根据业务需求实现其中三个方法map() setup() cleanup () 逻辑处理接口Reducer 用户根据业务需求实现其中三个方法reduce() setup() cleanup () 3.2 Map Join 在Reduce端处理过多的表非常容易产生数据倾斜。怎么办 优点在Map端缓存多张表提前处理业务逻辑这样增加 Map 端业务减少 Reduce 端数据的压力尽可能的减少数据倾斜。 使用场景Map Join适用于一张表十分小、一张表很大的场景。 具体办法采用 DistributedCache 在 Mapper 的setup阶段将文件读取到缓存集合中。在 Driver 驱动类中加载缓存。 Map Join思路分析当 MapTask 执行的时候先把数据量较小的文件 pd.txt 缓存到内存当中去。MapTask 正常将 order.txt 的数据读取输入每处理一行数据就可以根据文件中的 pid 作为 key 到内存中的 HashMap 中获取对应的 pname。 代码编写 Map 端 MapJoinMapper.java package com.huwei.mr.mapjoin;import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map;/*** 1. 处理缓存文件从job设置的缓存路径中获取到* 2. 根据缓存文件的路径再结合输入流把pd.txt文件的内容写入到内存中的容器中维护*/ public class MapJoinMapper extends MapperLongWritable, Text, Text, NullWritable {private MapString, String pdMap new HashMap();private Text outk new Text();/*** 处理缓存文件** param context* throws IOException* throws InterruptedException*/Overrideprotected void setup(MapperLongWritable, Text, Text, NullWritable.Context context) throws IOException, InterruptedException {//通过缓存文件得到小表数据pd.txtURI[] cacheFiles context.getCacheFiles();URI cacheFile cacheFiles[0];// 准备输入流对象FileSystem fs FileSystem.get(context.getConfiguration());FSDataInputStream pd fs.open(new Path(cacheFile));// 通过流对象将数据读入保存到内存的HashMap中BufferedReader reader new BufferedReader(new InputStreamReader(pd, UTF-8));// 按行读取String line;while ((line reader.readLine()) ! null) {// 将数据保存到HashMap中String[] datas line.split(\t);pdMap.put(datas[0], datas[1]);}// 关闭资源IOUtils.closeStream(pd);}Overrideprotected void map(LongWritable key, Text value, MapperLongWritable, Text, Text, NullWritable.Context context) throws IOException, InterruptedException {// 获取order.txt当前行数据String lineData value.toString();// 切割数据String[] orderDatas lineData.split(\t);// 进行数据关联获取pnameString pname pdMap.get(orderDatas[1]);// 封装输出结果String result orderDatas[0] \t pname \t orderDatas[2];outk.set(result);// 将结果写出context.write(outk,NullWritable.get());} } Driver MapJoinDriver package com.huwei.mr.mapjoin;import com.huwei.mr.reducejoin.Orderpd; import com.huwei.mr.reducejoin.ReduceJoinDriver; import com.huwei.mr.reducejoin.ReduceJoinMapper; import com.huwei.mr.reducejoin.ReduceJoinReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException; import java.net.URI;public class MapJoinDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 声明配置对象Configuration conf new Configuration();// 声明Job对象Job job Job.getInstance(conf);// 指定当前Job的驱动类job.setJarByClass(MapJoinDriver.class);// 指定当前Job的Mapperjob.setMapperClass(MapJoinMapper.class);// 指定Map端输出数据的key的类型和输出数据value的类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);// 指定最终Reduce端输出数据的key的类型和输出数据value的类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 设置reduceTask的数量为0job.setNumReduceTasks(0);// 设置缓存文件的路径job.addCacheFile(URI.create(file:///E:/hadoop/in/cachefile/pd.txt));FileInputFormat.setInputPaths(job, new Path(E:\\hadoop\\in\\mapjoin));FileOutputFormat.setOutputPath(job, new Path(E:\\hadoop\\out\\mapjoin));// 提交Jobjob.waitForCompletion(true);} }
http://www.zqtcl.cn/news/93855/

相关文章:

  • 德州建设银行兑换网站服务器网站跳转怎么做的
  • 金华专业做网站公司湖南网站建设服务
  • 企业网站设计沈阳苏宁电器网站建设特点分析
  • 建设工程类公司网站易语言可以做api网站对接吗
  • 青岛做网站皆赴青岛博wordpress 数据库 备份
  • 外贸公司网站空间哈尔滨seo优化专注
  • 建筑行业综合查询平台优化推广联盟
  • 北京管庄网站建设公司开平网站制作
  • 如何做销售直播网站最专业网站建设
  • 太原市住房和城乡建设局的网站首页网络推广服务外包公司
  • 湘icp备 网站建设 农业 湖南稿定设计免费版
  • 公司网站推广方法陕西省住房建设厅官网
  • 网站关键词排名突然没了无锡企业网站建设报价
  • 找做网站的人网站改版 301跳转
  • 网站备案一次就可以了吧营销管理培训课程
  • 怎么做网站背景专做民宿预定的网站
  • wordpress安装谷歌分析代码建网站seo
  • 百度外卖网站建设与维护方法建设 银行网网站
  • 小程序开发定制开发上海优化价格
  • 来宾住房和城乡建设局网站做外贸推广要做哪些平台
  • 无锡建设网站制作wordpress 知乎
  • 动漫网站源码免费怎么怎么做网站
  • 和两个黑人同时做网站中工互联网站建设
  • windows10PHP 网站建设app应用分发平台开发
  • 中唯建设工程有限公司网站做网站页面对PS切图
  • 个人网页制作成品欣赏seo网站沙盒期
  • 亚马逊站外推广网站怎么做制作营销网站模板免费下载
  • 加拿大网站后缀设计师互联网
  • 做物流的网站有哪些内容共同建设网站心得
  • 主题资源网站建设什么网站做污水处理药剂的好