网站建设 部署与发布试题,网站技术开发设计,技术支持 哈尔滨网站建设,微网站开发工具有哪些Hadoop的第二个核心组件#xff1a;MapReduce框架第二节 六、MapReduce的工作流程原理#xff08;简单版本#xff09;七、MapReduce中的序列化机制问题八、流量统计案例实现#xff08;序列化机制的实现#xff09; 六、MapReduce的工作流程原理#xff08;简单版本MapReduce框架第二节 六、MapReduce的工作流程原理简单版本七、MapReduce中的序列化机制问题八、流量统计案例实现序列化机制的实现 六、MapReduce的工作流程原理简单版本
1、客户端在执行MR程序时客户端先根据设置的InputFormat实现类去对输入的数据文件进行切片getSplits如果没有设置InputFormat实现类MR程序会使用默认的实现类TextInputFormat–FileInputFormat的子类进行切片规划生成一个切片规划文件
2、客户端的切片规划文件生成以后客户端还会把整个MR程序的配置项Configuration配置会封装成为一个job.xml文件同时还会把MR程序的代码包括job.xml文件、切片规划文件提交给资源调度器YARN/windowsCPU,资源调度器会先分配资源启动MRAPPMaster的进程
3、MRAPPMaster会根据切片规划的切片个数向资源调度器申请资源启动对应个数的MapTask任务去运行Mapper阶段的计算逻辑
4、MapTask启动成功以后会根据切片规划借助指定的InputFormat的实现类中createRecoder方法去对应的切片中读取k-v数据然后交给map方法做处理
5、map方法将切片的k-v数据处理完成会k-v数据写到一个内存缓冲区中(100M)如果内存缓冲区超过容量的80%会溢写磁盘溢写磁盘的时候会根据map输出的key值进行排序同时还会根据指定的Partitioner分区机制进行分区。溢写文件可能会存在多个等map阶段执行完成每一个MapTask对应的多个溢写文件以及缓冲区中还没有溢写的数据整体会进行一次合并形成一个最终的大文件分区排序
6、紧跟着MRAPPMaster会向资源管理器申请资源启动ReduceTaskReduceTask启动成功会从不同的MapTask的合并的大的溢写文件中去复制对应的分区的数据ReduceTask会对所有复制过来的数据再进行一次排序。
7、ReduceTask会对排好序的数据按照key进行分组分好组之后一组相同的key值调用一次reduce方法进行计算计算完成的数据会借助指定的OutputFormat类没有指定默认使用TextOutputFormat类 - FileOutputFormat实现子类将key-value数据写出到最终的结果文件中part-r-xxxxx 七、MapReduce中的序列化机制问题
MR程序的Map阶段和Reduce阶段都是要求输入的数据和输出的数据必须得是key-value键值对类型的数据而且key-value必须得是序列化类型的数据。
序列化将Java中的某种数据类型转成二进制数据
反序列化将二进制数据转换成某种数据类型
MR程序采用序列化机制的原因MR程序之所以要求输入和输出的数据是K-V类型的是因为MR程序是一个分布式计算程序MR程序可以在多个节点上同时运行的而且多个计算程序计算出来的结果可能跨节点跨网络进行数据传输的。如果数据要跨节点跨网络传输要求数据必须是二进制数据。MapReduce程序运行中Mapper阶段和Reducer阶段的输入和输出都是以key-value的格式进行的。同时Mapper和Reducer阶段的任务中需要的数据可能会跨网络或者跨节点传输因此我们就要求MR程序运行过程中所有的输入和输出的数据必须都得是可以被序列化的。
Hadoop在进行Key-Value的序列化的时候没有采用Java的序列化机制Serializable、Externalizable因为Java的序列化机制非常的笨重的因此Hadoop基于Java的序列化机制方式提供了一种全新的专门适用于MR程序的轻量级的序列化机制。
Hadoop中提供了两个接口Writable、WritableComparableHadoop提供的两个序列化机制。
Writable 只有序列化和反序列化的效果如果我们自定义的一个数据类型Java类要想当MR程序的value使用的话Java类必须实现Writable接口重写两个方法write - 序列化写、readFields - 反序列化读通过这两个方法规定序列化和反序列化的内容。 Writable的使用方式类似于Java中Externalizable序列化机制
WritableComparable 接口除了具备序列化和反序列化的能力以外还具备一个比较大小关系的方法。 如果自定义的数据类型Java类想当MR程序中的key值来使用必须实现此接口让自定义数据类型既可以进行序列化反序列化还可以进行大小的比较判断。 如果自定义的数据类型只想当作MR程序中的value来使用只需要实现Writable接口即可不需要比较大小。
Hadoop常见的序列化类型Hadoop把Java中包装类和String类型已经给我们封装好了对应的Hadoop序列化类型 —— 实现了WritableComparable接口
Java类型Hadoop Writable类型booleanBooleanWritablebyteByteWritableintIntWritablefloatFloatWritablelongLongWritabledoubleDoubleWritablestringTextmapMapWritablearrayArrayWritable
【注意】 1、如果以后MR程序运行没有报错但是输出目录没有任何的内容一般可能是因为输入和输出的key-value的自定义类型没有实现序列化。 2、如果自定义的JavaBean充当Reducer阶段输出key-value时最好把toString方法给重写了否则Reducer最后输出的结果是JavaBean的地址值。
八、流量统计案例实现序列化机制的实现
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;/*** JavaBean是Java中一种很干净的类类当中只具备私有化的属性、构造器、getter setter方法 hashCode equals方法 toString方法* 实体类实体类又是一种特殊的JavaBean当JavaBean是和数据库中数据表对应的类的时候JavaBean称之为实体类** JavaBean可以自己手动的生成也可以使用Lombok技术基于注解快速创建Java类* Lombok使用慎重Lombok对代码的侵占性是非常大的** 如果自定义的JavaBean要当MR程序的输入和输出的KV值最好让JavaBean存在一个无参构造器MR程序底层反射构建这个类的对象* 如果自定义的JavaBean要去充当Reducer阶段KEY和Value那也就意味着JavaBean的结果要写到最终的结果文件中JavaBean的数据往结果文件写的格式还是按照JavaBean的toString方法去写的。*/
public class FlowBean implements Writable {private Long upFlow;//上行流量private Long downFlow;//下行流量private Long sumFlow;//总流量public FlowBean() {}public FlowBean(Long upFlow, Long downFlow, Long sumFlow) {this.upFlow upFlow;this.downFlow downFlow;this.sumFlow sumFlow;}public Long getUpFlow() {return upFlow;}public void setUpFlow(Long upFlow) {this.upFlow upFlow;}public Long getDownFlow() {return downFlow;}public void setDownFlow(Long downFlow) {this.downFlow downFlow;}public Long getSumFlow() {return sumFlow;}public void setSumFlow(Long sumFlow) {this.sumFlow sumFlow;}Overridepublic boolean equals(Object o) {if (this o) return true;if (o null || getClass() ! o.getClass()) return false;FlowBean flowBean (FlowBean) o;return Objects.equals(upFlow, flowBean.upFlow) Objects.equals(downFlow, flowBean.downFlow) Objects.equals(sumFlow, flowBean.sumFlow);}Overridepublic int hashCode() {return Objects.hash(upFlow, downFlow, sumFlow);}Overridepublic String toString() {return upFlow \t downFlow \t sumFlow;}/*** 序列化写的方法* param out codeDataOuput/code to serialize this object into.* throws IOException*/Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}/*** 反序列化读取数据的方法* param in codeDataInput/code to deseriablize this object from.* throws IOException*/Overridepublic void readFields(DataInput in) throws IOException {upFlow in.readLong();downFlow in.readLong();sumFlow in.readLong();}
}import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;/*** 现在有一个文件 phone_data.txt文件中记录着手机号消耗的流量信息* 文件中每一行数据代表一条手机的流量消耗每一条数据是以\t制表符分割的多个字段组成的* 使用MR程序统计每一个手机号消耗的总的上行流量、总的下行流量、总流量*/
public class FlowDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {Configuration configuration new Configuration();configuration.set(fs.defaultFS,hdfs://192.168.31.104:9000);Job job Job.getInstance(configuration);//设置MR程序默认使用的InputFormat类 —— 负责进行切片 负责读取数据源的数据为key value类型的
// job.setInputFormatClass(FileInputFormat.class);//默认确实是FileInputFormat 但是是个 抽象类 MR程序默认使用的是这个抽象类的子类FileInputFormat.setInputPaths(job,/phone_data.txt);//封装Mapper阶段job.setMapperClass(FlowMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//封装Reducer阶段job.setReducerClass(FlowReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//封装输出结果路径
// job.setOutputFormatClass(FileOutputFormat.class);//MR程序要求输出路径不能提前存在 如果提前存在就会报错Path path new Path(/output);//是用来解决输出目录如果存在MR程序报错问题的FileSystem fileSystem FileSystem.get(new URI(hdfs://192.168.31.104:9000), configuration, root);if (fileSystem.exists(path)){fileSystem.delete(path,true);}FileOutputFormat.setOutputPath(job,path);//最后提交程序运行即可boolean b job.waitForCompletion(true);System.out.println(b?0:1);}
}import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** 读取切片数据一行数据读取一次 而且读取的key偏移量 value LongWritable Text* 输出的key手机号 value 是 Text FlowBean*/
public class FlowMapper extends MapperLongWritable, Text, Text, FlowBean {Overrideprotected void map(LongWritable key, Text value, MapperLongWritable, Text, Text, FlowBean.Context context) throws IOException, InterruptedException {String line value.toString();String[] array line.split(\t);String phoneNumber array[1];Long downFlow Long.parseLong(array[array.length - 2]);Long upFlow Long.parseLong(array[array.length - 3]);FlowBean flowBean new FlowBean(upFlow,downFlow,upFlow downFlow);//需要将这一条数据以手机号为key以flowBean为value输出给reducecontext.write(new Text(phoneNumber),flowBean);}
}import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/****/
public class FlowReducer extends ReducerText, FlowBean, Text, FlowBean {Overrideprotected void reduce(Text key, IterableFlowBean values, ReducerText, FlowBean, Text, FlowBean.Context context) throws IOException, InterruptedException {Long upFlowSum 0L;Long downFlowSum 0L;Long sumFlowSum 0L;for (FlowBean value : values) {upFlowSum value.getUpFlow();downFlowSum value.getDownFlow();sumFlowSum value.getSumFlow();//需要以手机号为key以flowBean为value将结果输出flowBean需要将我们计算出来总流量信息封装起来FlowBean flowBean new FlowBean(upFlowSum,downFlowSum,sumFlowSum);context.write(key,flowBean);}}
}package com.kang.flow02;import com.kang.flow.FlowDriver;
import jdk.nashorn.internal.runtime.regexp.joni.Config;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;/*** 基于以前统计的手机消耗流量信息的结果文件要求对结果文件进行二次分析得到以下结果* 1、要求对数据中的手机号按照归属地不同进行分区* 134开头的手机号 0号分区* 135开头的手机号 1号分区* 136开头的手机号 2号分区* 137开头的手机号 3号分区* 其余的手机号 4号分区* 2、同时还要求每一个分区按照消耗的总流量从高到底进行排序*/
public class FlowDriver02 {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {Configuration configuration new Configuration();Job job Job.getInstance(configuration);job.setJarByClass(FlowDriver02.class);job.setInputFormatClass(TextInputFormat.class);FileInputFormat.setInputPaths(job,new Path(/output/part-r-00000));job.setMapperClass(FlowMapper02.class);job.setMapOutputKeyClass(FlowBean02.class);job.setMapOutputValueClass(NullWritable.class);job.setPartitionerClass(FlowPartitioner.class);job.setReducerClass(FlowReducer02.class);job.setOutputKeyClass(FlowBean02.class);job.setOutputValueClass(NullWritable.class);job.setNumReduceTasks(5);Path path new Path(/output1);FileSystem fs FileSystem.get(new URI(hdfs://192.168.31.104:9000), configuration, root);if (fs.exists(path)){fs.delete(path);}FileOutputFormat.setOutputPath(job,path);boolean flag job.waitForCompletion(true);System.exit(flag?0:1);}
}class FlowMapper02 extends MapperLongWritable, Text,FlowBean02,NullWritable {Overrideprotected void map(LongWritable key, Text value, MapperLongWritable, Text, FlowBean02, NullWritable.Context context) throws IOException, InterruptedException {String line value.toString();String[] message line.split(\t);String phoneNumber message[0];Long upFlow Long.parseLong(message[1]);Long downFlow Long.parseLong(message[2]);Long sumFlow Long.parseLong(message[3]);FlowBean02 flowBean02 new FlowBean02(phoneNumber,upFlow,downFlow,sumFlow);context.write(flowBean02,NullWritable.get());}
}oneNumber.startsWith(137)) {return 3;}else {return 4;}
// String message flowBean02.toString();
// String[] array message.split(\t);
// String phoneNumber array[0];
// char w1 phoneNumber.charAt(0);
// char w2 phoneNumber.charAt(1);
// char w3 phoneNumber.charAt(2);
// if (w1 1 w2 3) {
// if (w3 4) return 0;
// if (w3 5) return 1;
// if (w3 6) return 2;
// if (w3 7) return 3;
// }
// return 4;}
}class FlowReducer02 extends ReducerFlowBean02,NullWritable,FlowBean02, NullWritable{Overrideprotected void reduce(FlowBean02 key, IterableNullWritable values, ReducerFlowBean02, NullWritable, FlowBean02, NullWritable.Context context) throws IOException, InterruptedException {context.write(key,NullWritable.get());}
}package com.kang.flow02;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;public class FlowBean02 implements WritableComparableFlowBean02 {private String phoneNumber;private Long upFlow;private Long downFlow;private Long sumFlow;public FlowBean02() {}public FlowBean02(String phoneNumber, Long upFlow, Long downFlow, Long sumFlow) {this.phoneNumber phoneNumber;this.upFlow upFlow;this.downFlow downFlow;this.sumFlow sumFlow;}public String getPhoneNumber() {return phoneNumber;}public void setPhoneNumber(String phoneNumber) {this.phoneNumber phoneNumber;}public Long getUpFlow() {return upFlow;}public void setUpFlow(Long upFlow) {this.upFlow upFlow;}public Long getDownFlow() {return downFlow;}public void setDownFlow(Long downFlow) {this.downFlow downFlow;}public Long getSumFlow() {return sumFlow;}public void setSumFlow(Long sumFlow) {this.sumFlow sumFlow;}Overridepublic boolean equals(Object o) {if (this o) return true;if (o null || getClass() ! o.getClass()) return false;FlowBean02 that (FlowBean02) o;return Objects.equals(phoneNumber, that.phoneNumber) Objects.equals(upFlow, that.upFlow) Objects.equals(downFlow, that.downFlow) Objects.equals(sumFlow, that.sumFlow);}Overridepublic int hashCode() {return Objects.hash(phoneNumber, upFlow, downFlow, sumFlow);}Overridepublic String toString() {return phoneNumber \t upFlow \t downFlow \t sumFlow;}Overridepublic int compareTo(FlowBean02 o) {if (this.sumFlow o.sumFlow){return 1;} else if (this.sumFlow o.sumFlow) {return -1;}else {return 0;}}Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(phoneNumber);out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}Overridepublic void readFields(DataInput in) throws IOException {phoneNumber in.readUTF();upFlow in.readLong();downFlow in.readLong();sumFlow in.readLong();}
}