玉溪市建设局网站,什么网站做ppt,wordpress mysql port,百度搜索引擎官网入口一、Hadoop数据序列化的数据类型 Java数据类型 Hadoop数据类型 int IntWritable float FloatWritable long LongWritable double DoubleWritable String Text boolean BooleanWr…一、Hadoop数据序列化的数据类型 Java数据类型 Hadoop数据类型 int IntWritable float FloatWritable long LongWritable double DoubleWritable String Text boolean BooleanWritable byte ByteWritable map MapWritable array ArrayWritable 二、Hadoop的序列化 1.什么是序列化 在java中序列化接口是Serializable它下面又实现了很多的序列化接口所以java的序列化是一个重量级的序列化框架一个对象被java序列化之后会附带很多额外的信息(校验信息、header、继承体系等)不便于在网络中进行高效的传输所以Hadoop开发了一套自己的序列化框架——Writable。 序列化就是把内存当中的对象转化为字节序列以便于存储和网络传输 反序列化是将收到的字节序列或硬盘当中的持续化数据转换成内存中的对象。 2.序列化的理解方法(自己悟的不对勿喷~~) 比如下面流量统计案例中流量的封装类FlowBean实现了Writable接口其中定义了变量upFlow、dwFlow、flowSum 在Mapper和Reducer类中初始化封装类FlowBean时内存会分配空间加载这些对象而这些对象不便于在网络中高效的传输这是封装类FlowBean中的序列化方法将这些对象转换为字节序列方便了存储和传输 当Mapper或Reducer需要将这些对象的字节序列写出到磁盘时封装类FlowBean中的反序列化方法将字节序列转换为对象然后写道磁盘中。 3.序列化特点 序列化与反序列化时分布式数据处理当中经常会出现的比如hadoop通信是通过远程调用(rpc)实现的这个过程就需要序列化。 特点1紧凑 2快速 3可扩展 4可互操作 三、Mapreduce的流量统计程序案例 1.代码 /*** author: PrincessHug* date: 2019/3/23, 23:38* Blog: https://www.cnblogs.com/HelloBigTable/*/
public class FlowBean implements Writable {private long upFlow;private long dwFlow;private long flowSum;public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow upFlow;}public long getDwFlow() {return dwFlow;}public void setDwFlow(long dwFlow) {this.dwFlow dwFlow;}public long getFlowSum() {return flowSum;}public void setFlowSum(long flowSum) {this.flowSum flowSum;}public FlowBean() {}public FlowBean(long upFlow, long dwFlow) {this.upFlow upFlow;this.dwFlow dwFlow;this.flowSum upFlow dwFlow;}/*** 序列化* param out 输出流* throws IOException*/Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(dwFlow);out.writeLong(flowSum);}/*** 反序列化* param in* throws IOException*/Overridepublic void readFields(DataInput in) throws IOException {upFlow in.readLong();dwFlow in.readLong();flowSum in.readLong();}Overridepublic String toString() {return upFlow \t dwFlow \t flowSum;}
}public class FlowCountMapper extends MapperLongWritable, Text,Text,FlowBean {Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获取数据String line value.toString();//切分数据String[] fields line.split(\t);//封装数据String phoneNum fields[1];long upFlow Long.parseLong(fields[fields.length - 3]);long dwFlow Long.parseLong(fields[fields.length - 2]);//发送数据context.write(new Text(phoneNum),new FlowBean(upFlow,dwFlow));}
}public class FlowCountReducer extends ReducerText,FlowBean,Text,FlowBean {Overrideprotected void reduce(Text key, IterableFlowBean values, Context context) throws IOException, InterruptedException {//聚合数据long upFlow_sum 0;long dwFlow_sum 0;for (FlowBean f:values){upFlow_sum f.getUpFlow();dwFlow_sum f.getDwFlow();}//发送数据context.write(key,new FlowBean(upFlow_sum,dwFlow_sum));}
}public class FlowPartitioner extends PartitionerText,FlowBean {Overridepublic int getPartition(Text key, FlowBean value, int i) {//获取用来分区的电话号码前三位String phoneNum key.toString().substring(0, 3);//设置分区逻辑int partitionNum 4;if (135.equals(phoneNum)){return 0;}else if (137.equals(phoneNum)){return 1;}else if (138.equals(phoneNum)){return 2;}else if (139.equals(phoneNum)){return 3;}return partitionNum;}
}
public class FlowCountDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//获取配置定义工具Configuration conf new Configuration();Job job Job.getInstance();//设置运行类job.setJarByClass(FlowCountDriver.class);//设置Mapper类及Mapper输出数据类型job.setMapperClass(FlowCountMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//设置Reducer类及其输出数据类型job.setReducerClass(FlowCountReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//设置自定义分区job.setPartitionerClass(FlowPartitioner.class);job.setNumReduceTasks(5);//设置文件输入输出流FileInputFormat.setInputPaths(job,new Path(G:\\mapreduce\\flow\\in));FileOutputFormat.setOutputPath(job,new Path(G:\\mapreduce\\flow\\inpartitionout));//返回运行完成if (job.waitForCompletion(true)){System.out.println(运行完毕);}else {System.out.println(运行出错!);}}
}转载于:https://www.cnblogs.com/HelloBigTable/p/10590705.html