学做网站设计,可爱风格网站,wordpress 插件数据库,贵州网站建设哪家好序列化就是把内存中的对象转换成字节序列以便于存储到磁盘(持久化)和网络传输。反序列化就是将字节序列或者是持久化的数据转换成内存中的对象。内存中的对象只能本地进程使用#xff0c;断掉后就消失了#xff0c;也不能被发送到网络上的另一台机器#xff0c;序列化可以将…序列化就是把内存中的对象转换成字节序列以便于存储到磁盘(持久化)和网络传输。反序列化就是将字节序列或者是持久化的数据转换成内存中的对象。内存中的对象只能本地进程使用断掉后就消失了也不能被发送到网络上的另一台机器序列化可以将内存中的对象发送到远程机器。由于Java本身的序列化框架(Serializable)太重序列化的对象包含了很多额外信息不便于在网络中高效传输Hadoop开发了自己的序列化机制(Writable)。实现自定义bean对象的序列化步骤如下必须实现Writable接口反序列化时需要反射调用空构造参数所以必须有空参构造public FlowBean() {super();}重写序列化方法Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}重写反序列化方法Overridepublic void readFields(DataInput in) throws IOException {upFlow in.readLong();downFlow in.readLong();sumFlow in.readLong();}注意反序列化的顺序和序列化的顺序完全一致。要想把结果显示在文件中需要重写toString()方法可用“\t“分开如果需要将自定义的Bean放在Key中传输还需要实现Comparable接口因为MapReduce框架中的Shuffle过程要求必须对key必须能排序。Overridepublic int compareTo(FlowBean o) {return this.sumFlow o.getSumFlow() ? -1 : 1;}自定义序列化统计txt中每个电话号的上行流量、下行流量和总流量。数据示例如下倒数第二和第三列分别为下行流量和上行流量。0 13152567890 www.baidu.com 90 100 2001 16592992187 www.google.com 100 2000 2002 15716605853 www.vx.com 2000 2043 2003 16592992187 www.baidu.com 204 222 2004 13152567890 www.python.org 20 40 500自定义的Bean按照上述要求完成。package Flowsum;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class FlowBean implements Writable {private long upFlow;private long downFlow;private long sumFlow;// 空参构造实现反射调用public FlowBean() {super();}// 有参构造public FlowBean(long upFlow, long downFlow) {super();this.upFlow upFlow;this.downFlow downFlow;sumFlow upFlow downFlow;}// 序列化方法public void write(DataOutput dataOutput) throws IOException {dataOutput.writeLong(upFlow);dataOutput.writeLong(downFlow);dataOutput.writeLong(sumFlow);}// 反序列化方法public void readFields(DataInput dataInput) throws IOException {// 要求和序列化时的顺序一致upFlow dataInput.readLong();downFlow dataInput.readLong();sumFlow dataInput.readLong();}Overridepublic String toString() {return upFlow \t downFlow \t sumFlow;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow sumFlow;}public void set(long upFlow2, long downFlow2) {upFlow upFlow2;downFlow downFlow2;sumFlow upFlow downFlow;}}注意1)空参构造必须有2)序列化的过程和反序列化的过程比必须一致3)每个字段必须有get和set方法。Mapperpackage Flowsum;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FlowCountMapper extends Mapper {Text k new Text();FlowBean v new FlowBean();Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 1 获取一行String line value.toString();// 2 切分String[] fields line.split(\t);// 3 封装对象k.set(fields[1]);long upFlow Long.parseLong(fields[fields.length - 3]);long downFlow Long.parseLong(fields[fields.length - 2]);v.setUpFlow(upFlow);v.setDownFlow(downFlow);// 4 写出context.write(k, v);}}Reducerpackage Flowsum;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FlowCountReducer extends Reducer {FlowBean v new FlowBean();Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {// 1 累加求和long sum_upFlow 0;long sum_downFlow 0;for (FlowBean flowBean : values) {sum_upFlow flowBean.getUpFlow();sum_downFlow flowBean.getDownFlow();}v.set(sum_upFlow, sum_downFlow);// 2 写出context.write(key, v);}}Driverpackage Flowsum;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FlowCountDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 1 获取Job对象Configuration conf new Configuration();Job job Job.getInstance(conf);// 2 设置jar路径job.setJarByClass(FlowCountDriver.class);// 3 关联Mapper和Reducerjob.setMapperClass(FlowCountMapper.class);job.setReducerClass(FlowCountReducer.class);// 4 设置Mappr输出的key和value类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);// 5 设置最终输出的key和value类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 6 设置输入路径和输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7 提交job.waitForCompletion(true);}}