宽带开户多少钱,苏州seo网站系统,网站建设教程 作业,苏州市网站在介绍这个实例之前#xff0c;请各位参考#xff1a;http://bjyjtdj.iteye.com/blog/1453410。
reduce side join是一种最简单的join方式#xff0c;其主要思想如下#xff1a; 在map阶段#xff0c;map函数同时读取两个文件File1和File2#xff0c;为了区分两种来源的…在介绍这个实例之前请各位参考http://bjyjtdj.iteye.com/blog/1453410。
reduce side join是一种最简单的join方式其主要思想如下 在map阶段map函数同时读取两个文件File1和File2为了区分两种来源的key/value数据对对每条数据打一个标签tag,比如tag0表示来自文件File1tag2表示来自文件File2。即map阶段的主要任务是对不同文件中的数据打标签。在reduce阶段reduce函数获取key相同的来自File1和File2文件的value list 然后对于同一个key对File1和File2中的数据进行join笛卡尔乘积。即reduce阶段进行实际的连接操作。在这个例子中我们假设有两个数据文件如下
user.csv文件
ID,NAME,SEX 1,user1,0 2,user2,0 3,user3,0 4,user4,1 5,user5,0 6,user6,0 7,user7,1 8,user8,0 9,user9,0
order.csv文件
USER_ID,NAME 1,order1 2,order2 3,order3 4,order4 7,order7 8,order8 9,order9 目前网上的例子大多是基于0.20以前版本的API写的所以咱们采用新的API来写具体代码如下 public class MyJoin
{public static class MapClass extends MapperLongWritable, Text, Text, Text{//最好在map方法外定义变量以减少map计算时创建对象的个数private Text key new Text();private Text value new Text();private String[] keyValue null;Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException{//采用的数据输入格式是TextInputFormat//文件被分为一系列以换行或者制表符结束的行//key是每一行的位置偏移量,LongWritable类型//value是每一行的内容,Text类型所有我们要把key从value中解析出来keyValue value.toString().split(,, 2);this.key.set(keyValue[0]);this.value.set(keyValue[1]);context.write(this.key, this.value);}}public static class Reduce extends ReducerText, Text, Text, Text{//最好在reduce方法外定义变量以减少reduce计算时创建对象的个数private Text value new Text();Overrideprotected void reduce(Text key, IterableText values, Context context)throws IOException, InterruptedException{StringBuilder valueStr new StringBuilder();//values中的每一个值是不同数据文件中的具有相同key的值//即是map中输出的多个文件相同key的value值集合for(Text val : values){valueStr.append(val);valueStr.append(,);}this.value.set(valueStr.deleteCharAt(valueStr.length()-1).toString());context.write(key, this.value);}}public static void main(String[] args) throws Exception{Configuration conf new Configuration();Job job new Job(conf, MyJoin);job.setJarByClass(MyJoin.class);job.setMapperClass(MapClass.class);job.setReducerClass(Reduce.class);//job.setCombinerClass(Reduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);//分别采用TextInputFormat和TextOutputFormat作为数据的输入和输出格式//如果不设置这也是Hadoop默认的操作方式job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}转发:https://blog.csdn.net/huashetianzu/article/details/7819244