建站管理过程,新型网络营销模式,网站开发与维护岗位说明书,广州市场监督管理局官网MapReduce被广泛应用于日志分析、海量数据排序、在海量数据中查找特定模式等场景中。 MapReduceJob
在Hadoop中#xff0c;每个MapReduce任务都被初始化为一个Job。
每个Job又可以分为两个阶段#xff1a;Map阶段和Reduce阶段。这两个阶段分别用Map函数和Reduce函数来表示。…MapReduce被广泛应用于日志分析、海量数据排序、在海量数据中查找特定模式等场景中。 MapReduceJob
在Hadoop中每个MapReduce任务都被初始化为一个Job。
每个Job又可以分为两个阶段Map阶段和Reduce阶段。这两个阶段分别用Map函数和Reduce函数来表示。
Map函数接收一个key,value形式的输入然后产生另一种key,value的中间输出Hadoop负责将所有具有相同中间key值的value集合到一起传递给Reduce函数Reduce函数接收一个如key,(list of values)形式的输入然后对这个value集合进行处理并输出结果Reduce的输出也是key,value形式的。 InputFormat()和InputSplit
InputSplit是Hadoop中用来把输入数据传送给每个单独的MapInputSplit存储的并非数据本身而是起始位置、分片长度和一个记录数据所在主机的数组。生成InputSplit的方法可以通过InputFormat()来设置。当数据传送给Map时Map会将输入分片传送到InputFormat()上InputFormat调用getRecordReader()方法生成RecordReaderRecordReader在通过creatKey()、createValue()方法将InputSplit创建成可供Map处理的key,value对。即InputFormat()方法是用来生成可供Map处理的key,value对的。
InputFormat BaileyBorweinPlouffe.BbpInputFormat ComposableInputFormat CompositeInputFormat DBInputFormat DistSum.Machine.AbstractInputFormat FileInputFormat CombineFileInputFormat KeyValueTextInputFormat NLineInputFormat SequenceFileInputFormat TeraInputFormat TextInputFormat TextInputFormat是Hadoop默认的输入方式。在TextInputFormat中每个文件或其一部分都会单独作为Map的输入而这是继承自FileInputFormat的。之后每行数据都会生成一条记录每条记录则表示成key,value形式。
key值是每个数据记录在数据分片中的字节偏移量数据类型是LongWritable
value值是每行的内容数据类型是Text。
如
file1:
0 hello world bye world
file2:
0 hello hadoop bye hadoop
两个文件都会被单独输入到一个Map中因此它们的值都是0。 OutputFormat()
默认的输出格式是TextOutputFormat每条记录以一行的形式存入文本文件键和值是任意形式的程序内部调用toString()方法将键和值转换为String类型再输出。 Map函数和Reduce函数 public class WordCountMapper extends MapperLongWritable, Text, Text, IntWritable{protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {String[] words StringUtils.split(value.toString(), );for(String w :words){context.write(new Text(w), new IntWritable(1));}}
}public class WordCountReducer extends ReducerText, IntWritable, Text, IntWritable{protected void reduce(Text key, IterableIntWritable values,Context context)throws IOException, InterruptedException {int sum 0;for(IntWritable i: values){sumsumi.get();}arg2.write(key, new IntWritable(sum));}
}
public class RunJob {public static void main(String[] args) {Configuration config new Configuration();
// config.set(fs.defaultFS, hdfs://node1:8020);
// config.set(yarn.resourcemanager.hostname, node1);config.set(mapred.jar, C:\\Users\\Administrator\\Desktop\\wc.jar);try {FileSystem fs FileSystem.get(config);Job job Job.getInstance(config);job.setJarByClass(RunJob.class);job.setJobName(wc);job.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(/usr/input/));Path outpath new Path(/usr/output/wc);if(fs.exists(outpath)){fs.delete(outpath, true);}FileOutputFormat.setOutputPath(job, outpath);boolean f job.waitForCompletion(true);if(f){System.out.println(job completion);}} catch (Exception e) {e.printStackTrace();}}
}注意两种情况 1、Reduce Task的数量可以由程序指定当存在多个Reduce Task时每个Reduce会搜集一个或多个key值。当存在多个Reduce Task时每个Reduce Task都会生成一个输出文件
2、没有Reduce任务的时候系统会直接将Map的输出结果作为最终结果有多少个Map就有多少个输出。 MapReduce任务的优化
如何完成这个任务怎么能让程序运行的更快。
MapReduce计算模型的优化主要集中在两个方面计算性能方面IO操作方面。 1、任务调度
计算方面优先将任务分配给空闲机器
IO方面尽量将Map任务分配给InputSplit所在的机器。 2、数据预处理与InputSplit的大小
MapReduce擅长处理少量的大数据在处理大量的小数据时性能会很逊色。
因此在提交MapReduce任务前可以先对数据进行一次预处理将数据合并以提高MapReduce任务的执行效率。
另一方面是参考Map任务的运行时间当一个Map任务只需要运行几秒就可以结束时就需要考虑是否应该给它分配更多的数据。通常而言一个Map任务的运行时间在一分钟左右比较合适。
在FileInputFormat中除了CombineFileInputFormatHadoop会在处理每个Block后将其作为一个InputSplit因此合理地设置block块大小是很重要的调节方式。 3、Map和Reduce任务的数量
Map/Reduce任务槽集群能够同时运行的Map/Reduce任务的最大数量。
如100台机器每台最多同时运行10个Map和5个Reduce则Map任务槽为1000Reduce任务槽为500。
设置Map任务的数量主要参考的是Map的运行时间设置Reduce任务的数量主要参考的是Reduce槽的数量。
Reduce任务槽的0.95倍如果一个Reduce任务失败可以很快找到一个空闲的机器重新执行
Reduce任务槽的1.75倍执行快的机器可以获得更多的Reduce任务因此可以使负载更加均衡以提高任务的处理速度。 4、Combine函数
用于本地合并数据以减少网络IO操作的消耗。
合理的设计combine函数会有效减少网络传输的数据量提高MapReduce的效率。
job.setCombinerClass(combine.class);
在WordCount中可以指定Reduce类为combine函数
job.setCombinerClass(Reduce.class); 5、压缩
可以选择对Map的输出和最终的输出结果进行不同压缩方式的压缩。
在一些情况下Map的中间输出可能会很大对其进行压缩可以有效地减少网络上的数据传输量。 6、自定义comparator
自定义Hadoop数据类型时推荐自定义comparator来实现数据的比较这样可以省去数据序列化和反序列化的时间提高程序的运行效率。