淘宝客网站一定要备案吗,电商订单系统,做网站赚广告费多么,wordpress如何修改htmlHadoop 分布式缓存实现目的是在所有的MapReduce调用一个统一的配置文件#xff0c;首先将缓存文件放置在HDFS中#xff0c;然后程序在执行的过程中会可以通过设定将文件下载到本地具体设定如下#xff1a; public static void main(String[] arge) throws IOException, Clas…Hadoop 分布式缓存实现目的是在所有的MapReduce调用一个统一的配置文件首先将缓存文件放置在HDFS中然后程序在执行的过程中会可以通过设定将文件下载到本地具体设定如下 public static void main(String[] arge) throws IOException, ClassNotFoundException, InterruptedException{ Configuration confnew Configuration(); conf.set(fs.default.name, hdfs://192.168.1.45:9000); FileSystem fsFileSystem.get(conf); fs.delete(new Path(CASICJNJP/gongda/Test_gd20140104)); conf.set(mapred.job.tracker, 192.168.1.45:9001); conf.set(mapred.jar, /home/hadoop/workspace/jar/OBDDataSelectWithImeiTxt.jar); Job jobnew Job(conf,myTaxiAnalyze); DistributedCache.createSymlink(job.getConfiguration());// try { DistributedCache.addCacheFile(new URI(/user/hadoop/CASICJNJP/DistributeFiles/imei.txt), job.getConfiguration()); } catch (URISyntaxException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } job.setMapperClass(OBDDataSelectMaper.class); job.setReducerClass(OBDDataSelectReducer.class); //job.setNumReduceTasks(10); //job.setCombinerClass(IntSumReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(/user/hadoop/CASICJNJP/SortedData/20140104)); FileOutputFormat.setOutputPath(job, new Path(CASICJNJP/gongda/SelectedData)); System.exit(job.waitForCompletion(true)?0:1); } 代码中标红的为将HDFS中的/user/hadoop/CASICJNJP/DistributeFiles/imei.txt作为分布式缓存 public class OBDDataSelectMaper extends MapperObject, Text, Text, Text { String[] strs; String[] ImeiTimes; String timei; String time; private java.util.ListInteger ImeiList new java.util.ArrayListInteger(); protected void setup(Context context) throws IOException, InterruptedException { try { Path[] cacheFiles DistributedCache.getLocalCacheFiles(context .getConfiguration()); if (cacheFiles ! null cacheFiles.length 0) { String line; BufferedReader br new BufferedReader(new FileReader( cacheFiles[0].toString())); try { line br.readLine(); while ((line br.readLine()) ! null) { ImeiList.add(Integer.parseInt(line)); } } finally { br.close(); } } } catch (IOException e) { System.err.println(Exception reading DistributedCache: e); } } public void map(Object key, Text value, Context context) throws IOException, InterruptedException { try { strs value.toString().split(\t); ImeiTimes strs[0].split(_); timei ImeiTimes[0]; if (ImeiList.contains(Integer.parseInt(timei))) { context.write(new Text(strs[0]), value); } } catch (Exception ex) { } }} 上述标红代码中在Map的setup函数中加载分布式缓存。转载于:https://www.cnblogs.com/mfryf/p/5360306.html