工程行业网站,内蒙古网站seo,网站改版会影响排名吗,网站建设常规自适应一、排序分组概述
MapReduce中排序和分组在哪里被执行 第3步中需要对不同分区中的数据进行排序和分组#xff0c;默认情况按照key进行排序和分组 二、排序
在Hadoop默认的排序算法中#xff0c;只会针对key值进行排序
任务#xff1a; 数据文件中#xff0c;如果按照第一…一、排序分组概述
MapReduce中排序和分组在哪里被执行 第3步中需要对不同分区中的数据进行排序和分组默认情况按照key进行排序和分组 二、排序
在Hadoop默认的排序算法中只会针对key值进行排序
任务 数据文件中如果按照第一列升序排列 当第一列相同时第二列升序排列 如果当第一列相同时求出第二列的最小值 自定义排序
1.封装一个自定义类型作为key的新类型将第一列与第二列都作为key
WritableComparable接口
定义
public interface WritableComparableT extends Writable, ComparableT {
}
自定义类型MyNewKey实现了WritableComparable的接口该接口中有一个compareTo()方法当对key进行比较时会调用该方法而我们将其改为了我们自己定义的比较规则从而实现我们想要的效果
private static class MyNewKey implements WritableComparableMyNewKey {long firstNum;long secondNum;public MyNewKey() {}public MyNewKey(long first, long second) {firstNum first;secondNum second;}Overridepublic void write(DataOutput out) throws IOException {out.writeLong(firstNum);out.writeLong(secondNum);}Overridepublic void readFields(DataInput in) throws IOException {firstNum in.readLong();secondNum in.readLong();}/** 当key进行排序时会调用以下这个compreTo方法*/Overridepublic int compareTo(MyNewKey anotherKey) {long min firstNum - anotherKey.firstNum;if (min ! 0) {// 说明第一列不相等则返回两数之间小的数return (int) min;} else {return (int) (secondNum - anotherKey.secondNum);}}}
2.改写最初的MapReduce方法函数 public static class MyMapper extendsMapperLongWritable, Text, MyNewKey, LongWritable {protected void map(LongWritable key,Text value,MapperLongWritable, Text, MyNewKey, LongWritable.Context context)throws java.io.IOException, InterruptedException {String[] spilted value.toString().split(\t);long firstNum Long.parseLong(spilted[0]);long secondNum Long.parseLong(spilted[1]);// 使用新的类型作为key参与排序MyNewKey newKey new MyNewKey(firstNum, secondNum);context.write(newKey, new LongWritable(secondNum));};} public static class MyReducer extendsReducerMyNewKey, LongWritable, LongWritable, LongWritable {protected void reduce(MyNewKey key,java.lang.IterableLongWritable values,ReducerMyNewKey, LongWritable, LongWritable, LongWritable.Context context)throws java.io.IOException, InterruptedException {context.write(new LongWritable(key.firstNum), new LongWritable(key.secondNum));};}三、分组
在Hadoop中的默认分组规则中也是基于Key进行的会将相同key的value放到一个集合中去
目的求出第一列相同时第二列的最小值 上面的例子看分组因为我们自定义了一个新的key它是以两列数据作为key的因此这6行数据中每个key都不相同产生6组 它们是1 1,2 1,2 2,3 1,3 2,3 3。
而实际上只可以分为3组分别是123。现在首先改写一下reduce函数代码
public static class MyReducer extendsReducerMyNewKey, LongWritable, LongWritable, LongWritable {protected void reduce(MyNewKey key,java.lang.IterableLongWritable values,ReducerMyNewKey, LongWritable, LongWritable, LongWritable.Context context)throws java.io.IOException, InterruptedException {long min Long.MAX_VALUE;for (LongWritable number : values) {long temp number.get();if (temp min) {min temp;}}context.write(new LongWritable(key.firstNum), new LongWritable(min));};} 自定义分组
为了针对新的key类型作分组我们也需要自定义一下分组规则
private static class MyGroupingComparator implementsRawComparatorMyNewKey {/** 基本分组规则按第一列firstNum进行分组*/Overridepublic int compare(MyNewKey key1, MyNewKey key2) {return (int) (key1.firstNum - key2.firstNum);}/** param b1 表示第一个参与比较的字节数组* * param s1 表示第一个参与比较的字节数组的起始位置* * param l1 表示第一个参与比较的字节数组的偏移量* * param b2 表示第二个参与比较的字节数组* * param s2 表示第二个参与比较的字节数组的起始位置* * param l2 表示第二个参与比较的字节数组的偏移量*/Overridepublic int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);}}
自定义了一个分组比较器MyGroupingComparator该类实现了RawComparator接口而RawComparator接口又实现了Comparator接口这两个接口的定义
public interface RawComparatorT extends ComparatorT {public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}
public interface ComparatorT {int compare(T o1, T o2);boolean equals(Object obj);
}
分组实现步骤
1.MyGroupingComparator实现这两个接口 RawComparator中的compare()方法是基于字节的比较 Comparator中的compare()方法是基于对象的比较
由于在MyNewKey中有两个long类型每个long类型又占8个字节。这里因为比较的是第一列数字所以读取的偏移量为8字节。
2.添加对分组规则的设置 // 设置自定义分组规则 job.setGroupingComparatorClass(MyGroupingComparator.class);
3. 运行结果