网站代理浏览器7,做网站的公司排名,网站的建设思路,php网站开发答辩问的问题一、源码下载
下面是hadoop官方源码下载地址#xff0c;我下载的是hadoop-3.2.4#xff0c;那就一起来看下吧
Index of /dist/hadoop/core
二、从WordCount进入源码
用idea将源码加载进来后#xff0c;找到org.apache.hadoop.examples.WordCount类#xff08;快捷方法我下载的是hadoop-3.2.4那就一起来看下吧
Index of /dist/hadoop/core
二、从WordCount进入源码
用idea将源码加载进来后找到org.apache.hadoop.examples.WordCount类快捷方法双击Shift输入WordCount
/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* License); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an AS IS BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.hadoop.examples;import java.io.IOException;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class WordCount {public static class TokenizerMapper extends MapperObject, Text, Text, IntWritable{private final static IntWritable one new IntWritable(1);private Text word new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}public static class IntSumReducer extends ReducerText,IntWritable,Text,IntWritable {private IntWritable result new IntWritable();public void reduce(Text key, IterableIntWritable values, Context context) throws IOException, InterruptedException {int sum 0;for (IntWritable val : values) {sum val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {//构建一个新的Configurationstatic代码块中加载了core-default.xml、core-site.xml配置//如果core-site.xml将某个属性的final设置为true那么用户将无法进行修改Configuration conf new Configuration();//获取用户命令行中指定的选项并进行配置String[] otherArgs new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length 2) {System.err.println(Usage: wordcount in [in...] out);System.exit(2);}//根据配置和job名称创建一个新的JobJob是提交者面对的视图//此时Cluster是空的只有在需要时才会根据conf参数创建ClusterJob job Job.getInstance(conf, word count);//通过查找示例类位置来设置作业的jar文件此时Job状态被设置为DEFINEjob.setJarByClass(WordCount.class);//为作业设置Mapper该类必须是Mapper的子类那么设置mapreduce.job.map.class的value为该类job.setMapperClass(TokenizerMapper.class);//为作业设置combiner该类必须是Reducer的子类那么设置mapreduce.job.combine.class的value为该类job.setCombinerClass(IntSumReducer.class);//为作业设置Reducer该类必须是Reducer的子类那么设置mapreduce.job.reduce.class的value为该类job.setReducerClass(IntSumReducer.class);//设置作业输出的Key类型类即mapreduce.job.output.key.classjob.setOutputKeyClass(Text.class);//设置作业输出的Value类型类即mapreduce.job.output.value.classjob.setOutputValueClass(IntWritable.class);//设置输入数据的路径设置mapreduce.input.fileinputformat.inputdir为以逗号为连接符的多个输入路径for (int i 0; i otherArgs.length - 1; i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}//设置输出数据的路径,即mapreduce.output.fileoutputformat.outputdirFileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]));//将Job提交到集群并等待其完成。传参为true表示实时监控作业和打印状态System.exit(job.waitForCompletion(true) ? 0 : 1);}
}注释已加下面我们从job.waitForCompletion(true) 进入源码学习
其中涉及的方法有很多不可能一一来看我们这里只看主线上的方法以及重要的方法
1、Job.waitForCompletion
/*** Submit the job to the cluster and wait for it to finish.* param verbose print the progress to the user* return true if the job succeeded* throws IOException thrown if the communication with the * codeJobTracker/code is lost*/public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {//如果此时Job状态是否为DEFINE就提交if (state JobState.DEFINE) {//将作业提交到集群并立即返回。submit();}//如果传入的参数为true就实时打印Job状态if (verbose) {//随着进度和任务的进行实时监控作业和打印状态monitorAndPrintJob();} else {// get the completion poll interval from the client.//从客户端获取完成轮询间隔。可以通过mapreduce.client.completion.pollinterval设置//默认5000ms,JobClient轮询MapReduce ApplicationMaster以获取有关作业状态的更新的间隔以毫秒为单位。//测试小数据量时可以设置间隔短些生产上设置的间隔长一些可以减少客户端-服务器交互int completionPollIntervalMillis Job.getCompletionPollInterval(cluster.getConf());//检查作业是否已完成。这是一个非阻塞呼叫。while (!isComplete()) {try {Thread.sleep(completionPollIntervalMillis);} catch (InterruptedException ie) {}}}return isSuccessful();}
2、Job.submit
/*** Submit the job to the cluster and return immediately.* throws IOException*/public void submit() throws IOException, InterruptedException, ClassNotFoundException {ensureState(JobState.DEFINE);//默认设置为新API除非它们被显式设置或者使用了旧的映射器或reduce属性。setUseNewAPI();//采用impersonation(doAs)机制为符合身份和权限的用户构建Cluster//Cluster提供一种访问有关 map/reduce 群集的信息的方法。connect();//获取JobSubmitter 从字面上看时Job提交者 参数为文件系统和客户端//JobClient可以使用自有方法提交作业以供执行并了解当前系统状态。final JobSubmitter submitter getJobSubmitter(cluster.getFileSystem(), cluster.getClient());status ugi.doAs(new PrivilegedExceptionActionJobStatus() {public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {//用于向系统提交作业的内部方法。return submitter.submitJobInternal(Job.this, cluster);}});//更改作业状态为RUNNINGstate JobState.RUNNING;//获取可以显示该作业进度信息的URL。LOG.info(The url to track the job: getTrackingURL());}
3、JobSubmitter.submitJobInternal
/**
* 用于向系统提交Job的内部方法。
* Job提交过程包括
* 1、检查Job的输入和输出规格
* 2、计算Job的InputSplit
* 3、如有必要请为Job的DistributedCache设置必要的记帐信息
* 4、将Job的jar和配置复制到分布式文件系统上的map-reduce系统目录中
* 5、将作业提交到ResourceManager并可选择监视其状态。
* param job the configuration to submit
* param cluster the handle to the Cluster
* throws ClassNotFoundException
* throws InterruptedException
* throws IOException
*/
JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {//验证作业输出规格如果输出目录存在为避免重写则抛出异常checkSpecs(job);//根据Job获取Configuration刚刚是根据配置创建Job可见他们可以互相得到Configuration conf job.getConfiguration();//加载MapReduce框架存档路径到分布式缓存conf//如果设置了MapReduce框架存档的路径此路径通常位于HDFS文件系统中的公共位置框架存档将自动与作业一起分发addMRFrameworkToDistributedCache(conf);//初始化临时目录并返回路径Path jobStagingArea JobSubmissionFiles.getStagingDir(cluster, conf);//configure the command line options correctly on the submitting dfs//在提交的dfs上正确配置命令行选项//返回本地主机的地址。通过从系统中检索主机的名称然后将该名称解析为InetAddress//注意解析后的地址可能会被缓存一小段时间//如果存在安全管理器并被阻挡那么返回表示环回地址的InetAddress//会获取系统的所有网卡信息但是返回的是第一个InetAddress ip InetAddress.getLocalHost();if (ip ! null) {//设置提交端的ip地址submitHostAddress ip.getHostAddress();//设置提交端的hostnamesubmitHostName ip.getHostName();//设置job相关配置mapreduce.job.submithostnameconf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);//设置job相关配置mapreduce.job.submithostaddressconf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);}//作业分配一个唯一的jobIdJobID jobId submitClient.getNewJobID();//为job设置jobIdjob.setJobID(jobId);//Path submitJobDir new Path(jobStagingArea, jobId.toString());JobStatus status null;try {//设置mapreduce.job.user.nameconf.set(MRJobConfig.USER_NAME,UserGroupInformation.getCurrentUser().getShortUserName());//设置hadoop.http.filter.initializers默认的过滤类是org.apache.hadoop.http.lib.StaticUserWebFilter//这里设置的是AmFilterInitializer//该配置是以逗号分隔的类名列表必须是FilterInitializer子类//这些Filter将应用于所有面向用户的jsp和servlet网页conf.set(hadoop.http.filter.initializers, org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer);//设置mapreduce.job.dirconf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());LOG.debug(Configuring job jobId with submitJobDir as the submit dir);// get delegation token for the dir//获取dir的委派令牌TokenCache.obtainTokensForNamenodes(job.getCredentials(),new Path[] { submitJobDir }, conf);//获取密钥和令牌并将其存储到TokenCache中populateTokenCache(conf, job.getCredentials());// generate a secret to authenticate shuffle transfers// 生成一个密钥以验证无序传输if (TokenCache.getShuffleSecretKey(job.getCredentials()) null) {KeyGenerator keyGen;try {keyGen KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);keyGen.init(SHUFFLE_KEY_LENGTH);} catch (NoSuchAlgorithmException e) {throw new IOException(Error generating shuffle secret key, e);}//设置MapReduce中Shuffle的密钥key可见Shuffle的传输是有校验的是有数据完整性保障的SecretKey shuffleKey keyGen.generateKey();TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),job.getCredentials());}//判断是否加密中间MapReduce溢写文件默认falsemapreduce.job.encrypted-intermediate-dataif (CryptoUtils.isEncryptedSpillEnabled(conf)) {//如果设置了加密就把最大作业尝试次数设置为1默认值是2//该参数是应用程序尝试的最大次数如果失败ApplicationMaster会进行重试conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);LOG.warn(Max job attempts set to 1 since encrypted intermediate data spill is enabled);}//上传和配置与传递job相关的文件、libjar、jobjar和归档文件。//如果启用了共享缓存则此客户端将使用libjar、文件、归档和jobjar的共享缓存//1.对于已经成功共享的资源我们将继续以共享的方式使用它们。//2.对于不在缓存中并且需要NM上传的资源我们不会要求NM上传。copyAndConfigureFiles(job, submitJobDir);//获取job conf的文件路径Path submitJobFile JobSubmissionFiles.getJobConfPath(submitJobDir);// Create the splits for the job//为job创建 splitsLOG.debug(Creating splits at jtFs.makeQualified(submitJobDir));//重点看该方法该方法为job计算分片int maps writeSplits(job, submitJobDir);//设置map个数 mapreduce.job.maps 可见分片数map个数conf.setInt(MRJobConfig.NUM_MAPS, maps);LOG.info(number of splits: maps);//获取最大map数 mapreduce.job.max.map 默认 -1 即无限制int maxMaps conf.getInt(MRJobConfig.JOB_MAX_MAP,MRJobConfig.DEFAULT_JOB_MAX_MAP);if (maxMaps 0 maxMaps maps) {throw new IllegalArgumentException(The number of map tasks maps exceeded limit maxMaps);}// write queue admins of the queue to which job is being submitted// to job file.//将“作业提交到的队列的队列管理员”写入作业文件//获取队列名称 mapreduce.job.queuename 默认是defaultString queue conf.get(MRJobConfig.QUEUE_NAME,JobConf.DEFAULT_QUEUE_NAME);//获取给定作业队列的管理员。此方法仅供hadoop内部使用。AccessControlList acl submitClient.getQueueAdmins(queue);//设置mapred.queue.default.acl-administer-jobsconf.set(toFullPropertyName(queue,QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());// removing jobtoken referrals before copying the jobconf to HDFS// as the tasks dont need this setting, actually they may break// because of it if present as the referral will point to a// different job.//在将job conf复制到HDFS之前删除jobtoken引用因为任务不需要此设置//实际上它们可能会因此而中断因为引用将指向不同的作业。TokenCache.cleanUpTokenReferral(conf);//判断配置中mapreduce.job.token.tracking.ids.enabled跟踪作业使用的令牌的ID的配置默认falseif (conf.getBoolean(MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {// Add HDFS tracking idsArrayListString trackingIds new ArrayListString();for (Token? extends TokenIdentifier t :job.getCredentials().getAllTokens()) {trackingIds.add(t.decodeIdentifier().getTrackingId());}conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,trackingIds.toArray(new String[trackingIds.size()]));}// Set reservation info if it exists//设置预订信息如果存在mapreduce.job.reservation.idReservationId reservationId job.getReservationId();if (reservationId ! null) {conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());}// Write job file to submit dir//写入作业文件以提交目录(HDFS上)writeConf(conf, submitJobFile);//// Now, actually submit the job (using the submit name)// 现在真正提交作业使用提交名称//这里调用了YARNRunner.submitJob() 下面我们看下这个方法printTokens(jobId, job.getCredentials());status submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());if (status ! null) {return status;} else {throw new IOException(Could not launch job);}} finally {if (status null) {LOG.info(Cleaning up the staging area submitJobDir);if (jtFs ! null submitJobDir ! null)jtFs.delete(submitJobDir, true);}}}
3.1 JobSubmitter.writeSplits
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {JobConf jConf (JobConf)job.getConfiguration();int maps;//默认是false在Job.submit是用setUseNewAPI()方法设置过trueif (jConf.getUseNewMapper()) {//重点看该方法maps writeNewSplits(job, jobSubmitDir);} else {maps writeOldSplits(jConf, jobSubmitDir);}return maps;}
3.2 JobSubmitter.writeNewSplits
private T extends InputSplitint writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {Configuration conf job.getConfiguration();//获取输入格式化类可以通过mapreduce.job.inputformat.class设置//默认为TextInputFormat.classInputFormat?, ? input ReflectionUtils.newInstance(job.getInputFormatClass(), conf);//重点看这个方法//按逻辑拆分作业的输入文件集。//每个InputSplit都被分配给一个单独的Mapper进行处理(分片数量MapTask数量)//注意InputSplit是逻辑上的分割(比如 输入文件路径开始偏移量)并没有改变文件对应的块//InputFormat还创建RecordReader以读取InputSplit。ListInputSplit splits input.getSplits(job);T[] array (T[]) splits.toArray(new InputSplit[splits.size()]);// sort the splits into order based on size, so that the biggest// go first//根据大小将拆分部分按顺序排序使最大的优先Arrays.sort(array, new SplitComparator());JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);return array.length;}
3.3 FileInputFormat.getSplits
/** * Generate the list of files and make them into FileSplits.* 生成文件列表并将它们制作成FileSplits。* param job the job context* throws IOException*/public ListInputSplit getSplits(JobContext job) throws IOException {StopWatch sw new StopWatch().start();//getFormatMinSplitSize() 返回 1//getMinSplitSize(job)) 获取mapreduce.input.fileinputformat.split.minsize值默认1//两者取最大值因为两者默认值都是1那么 minSize 1long minSize Math.max(getFormatMinSplitSize(), getMinSplitSize(job));//获取mapreduce.input.fileinputformat.split.maxsize的值默认值Long.MAX_VALUE(2的63次方-1 MAX_VALUE0x7fffffffffffffffL)long maxSize getMaxSplitSize(job);// generate splits// 声明分片列表ListInputSplit splits new ArrayListInputSplit();//列出输入目录仅选择与正则表达式匹配的文件ListFileStatus files listStatus(job);//获取mapreduce.input.fileinputformat.input.dir.recursive的值 默认false//获取mapreduce.input.fileinputformat.input.dir.nonrecursive.ignore.subdirs的值 默认false//两者都为true才把ignoreDirs 设置为trueboolean ignoreDirs !getInputDirRecursive(job) job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);//循环输入的每个文件计算全部的InputSplitfor (FileStatus file: files) {//忽略目录if (ignoreDirs file.isDirectory()) {continue;}//FileStatus接口表示文件的客户端信息//Path 为FileSystem中文件或目录的名称//通过FileStatus获取PathPath path file.getPath();//获取此文件的长度以字节为单位。long length file.getLen();//如果文件长度不等于0if (length ! 0) {//BlockLocation 表示块的网络位置、有关包含块副本的主机的信息以及其他块元数据//例如与块相关的文件偏移量、长度、是否已损坏等。//如果文件是3个复本则BlockLocation的偏移量和长度表示文件中的绝对值而主机是保存副本的3个数据节点。以下是一个示例//BlockLocation(offset: 0, length: BLOCK_SIZE,hosts: {host1:9866, host2:9866, host3:9866})//如果文件是擦除编码的则每个BlockLocation表示一个逻辑块组。值偏移是文件中块组的偏移值长度是块组的总长度。BlockLocation的主机是保存块组的所有数据块和奇偶校验块的数据节点。//假设我们有一个RS_3_2编码文件3个数据单元和2个奇偶校验单元。BlockLocation示例如下//BlockLocation(offset: 0, length: 3 * BLOCK_SIZE, hosts: {host1:9866,host2:9866,host3:9866,host4:9866,host5:9866})BlockLocation[] blkLocations;//判断文件是否是LocatedFileStatus的实例//获取文件的 block 位置列表 if (file instanceof LocatedFileStatus) {blkLocations ((LocatedFileStatus) file).getBlockLocations();} else {FileSystem fs path.getFileSystem(job.getConfiguration());blkLocations fs.getFileBlockLocations(file, 0, length);}//判断文件是可拆分的吗通常情况下这是真的但如果文件是流压缩的则不会。if (isSplitable(job, path)) {//获取该文件的块大小HDFS允许文件可以指定自己的块大小和副本数long blockSize file.getBlockSize();//计算该文件的分片大小//Math.max(minSize, Math.min(maxSize, blockSize));//minSize 默认 1//maxSize 默认 Long.MAX_VALUE//那么默认情况下该文件的分片大小blockSize该文件的块大小long splitSize computeSplitSize(blockSize, minSize, maxSize);//默认剩下的字节长度文件总的字节长度long bytesRemaining length;//文件剩下的字节长度 / 分片大小默认该文件块大小 1.1//含义如果文件剩下的字节长度还有 块大小的1.1倍就继续 // 如果一个文件只有一个块 那么就不走该循环了while (((double) bytesRemaining)/splitSize SPLIT_SLOP) {//length-bytesRemaining 相当于对于该文件整体的偏移量//根据偏移量获取对应该文件的第几个块int blkIndex getBlockIndex(blkLocations, length-bytesRemaining);//添加分片splits.add(makeSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));bytesRemaining - splitSize;}//一般到最后一个分片会走这里或者该文件特别小只有一个块会走这里if (bytesRemaining ! 0) {int blkIndex getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));}} else { // not splitableif (LOG.isDebugEnabled()) {// Log only if the file is big enough to be splittedif (length Math.min(file.getBlockSize(), minSize)) {LOG.debug(File is not splittable so no parallelization is possible: file.getPath());}}//制作分片分片数量文件数量分片为该文件对应的副本中第一个副本所在位置优先取在缓存中的副本splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),blkLocations[0].getCachedHosts()));}} else { //Create empty hosts array for zero length files//如果输入文件的字节大小0创建空的分片splits.add(makeSplit(path, 0, length, new String[0]));}}// Save the number of input files for metrics/loadgen// 为 job 设置文件数 mapreduce.input.fileinputformat.numinputfilesjob.getConfiguration().setLong(NUM_INPUT_FILES, files.size());sw.stop();if (LOG.isDebugEnabled()) {LOG.debug(Total # of splits generated by getSplits: splits.size() , TimeTaken: sw.now(TimeUnit.MILLISECONDS));}return splits;}
4、YARNRunner.submitJob
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)throws IOException, InterruptedException {//添加TokensaddHistoryToken(ts);//构建启动MapReduce ApplicationMaster所需的所有信息// 1、设置LocalResources表示运行容器所需的本地资源NodeManager负责在启动容器之前本地化资源// 2、设置安全令牌// 3、为ApplicationMaster容器设置ContainerLaunchContext(表示NodeManager启动容器所需的所有信息包括ContainerId、资源情况、分配给谁、安全令牌、环境变量、启动容器的命令、容器失败退出时的重试策略、运行容器所必需的如二进制文件、jar、共享对象、辅助文件等、)// 4、设置ApplicationSubmissionContext表示ResourceManager启动应用程序的ApplicationMaster所需的所有信息。包括ApplicationId、用户、名称、优先级、执行ApplicationMaster的容器的ContainerLaunchContext、可尝试的最大次数、尝试间隔、NodeManager处理应用程序日志所需的所有信息// 5、设置ApplicationMaster资源请求// 6、为AM容器请求设置标签如果存在// 7、为job容器设置标签ApplicationSubmissionContext appContext createApplicationSubmissionContext(conf, jobSubmitDir, ts);// Submit to ResourceManager// 向ResourceManager提交try {//最终是用YarnClient来提交到YarnApplicationId applicationId resMgrDelegate.submitApplication(appContext);ApplicationReport appMaster resMgrDelegate.getApplicationReport(applicationId);String diagnostics (appMaster null ?application report is null : appMaster.getDiagnostics());if (appMaster null|| appMaster.getYarnApplicationState() YarnApplicationState.FAILED|| appMaster.getYarnApplicationState() YarnApplicationState.KILLED) {throw new IOException(Failed to run job : diagnostics);}return clientCache.getClient(jobId).getJobStatus(jobId);} catch (YarnException e) {throw new IOException(e);}}
5、YarnClientImpl.submitApplication
/*** 向YARN提交新申请这是一个阻塞调用-在提交的应用程序成功提交并被ResourceManager接受之前* 它不会返回ApplicationId。* 用户在提交新应用程序时应提供ApplicationId作为参数ApplicationSubmissionContext的一部分* 这在内部调用ApplicationClientProtocol.submitApplication() 之后在内部调用 ApplicationClientProtocol.getApplicationReport()* */
public ApplicationIdsubmitApplication(ApplicationSubmissionContext appContext)throws YarnException, IOException {//获取applicationIdApplicationId applicationId appContext.getApplicationId();if (applicationId null) {throw new ApplicationIdNotProvidedException(ApplicationId is not provided in ApplicationSubmissionContext);}//构建SubmitApplicationRequest向ResourceManager提交应用程序的请求信息SubmitApplicationRequest request Records.newRecord(SubmitApplicationRequest.class);request.setApplicationSubmissionContext(appContext);// Automatically add the timeline DT into the CLC// Only when the security and the timeline service are both enabled//仅当安全和时间线服务都启用时自动将时间线DT添加到CLC中if (isSecurityEnabled() timelineV1ServiceEnabled) {addTimelineDelegationToken(appContext.getAMContainerSpec());}//TODO: YARN-1763:Handle RM failovers during the submitApplication call.//提交作业//客户端用于向ResourceManager提交新应用程序的接口//客户端需要通过SubmitApplicationRequest提供详细信息如运行ApplicationMaster所需的队列、用于启动Application Master的等效队列等rmClient.submitApplication(request);int pollCount 0;long startTime System.currentTimeMillis();//Job等待状态设置EnumSetYarnApplicationState waitingStates EnumSet.of(YarnApplicationState.NEW,YarnApplicationState.NEW_SAVING,YarnApplicationState.SUBMITTED);//Job失败状态设置EnumSetYarnApplicationState failToSubmitStates EnumSet.of(YarnApplicationState.FAILED,YarnApplicationState.KILLED); while (true) {try {//获取应用的报告包括// ApplicationId// Applications user// Application queue// Application name// 允许ApplicationMaster的主机// ApplicationMaster的RPC端口// 跟踪url// ApplicationMaster的各种状态// 出现错误时的诊断信息// 应用的开始时间// 如果开启了安全性应用的客户端令牌ApplicationReport appReport getApplicationReport(applicationId);YarnApplicationState state appReport.getYarnApplicationState();if (!waitingStates.contains(state)) {if(failToSubmitStates.contains(state)) {throw new YarnException(Failed to submit applicationId to YARN : appReport.getDiagnostics());}LOG.info(Submitted application applicationId);break;}long elapsedMillis System.currentTimeMillis() - startTime;if (enforceAsyncAPITimeout() elapsedMillis asyncApiPollTimeoutMillis) {throw new YarnException(Timed out while waiting for application applicationId to be submitted successfully);}// Notify the client through the log every 10 poll, in case the client// is blocked here too long.//每10次轮询通过日志通知客户端以防客户端在此处被阻止的时间过长。if (pollCount % 10 0) {LOG.info(Application submission is not finished, submitted application applicationId is still in state);}try {//通过yarn.client.app-submission.poll-interval 设置默认值200msThread.sleep(submitPollIntervalMillis);} catch (InterruptedException ie) {String msg Interrupted while waiting for application applicationId to be successfully submitted.;LOG.error(msg);throw new YarnException(msg, ie);}} catch (ApplicationNotFoundException ex) {// FailOver or RM restart happens before RMStateStore saves// ApplicationState//故障转移或RM重新启动发生在RMStateStore保存ApplicationState之前LOG.info(Re-submit application applicationId with the same ApplicationSubmissionContext);rmClient.submitApplication(request);}}return applicationId;}
三、总结
1、构建Configuration并加载hadoop默认的配置文件core-default.xml、core-site.xml 2、解析命令行参数配置用户配置的环境变量 3、设置Job信息比如主类、Mapper类、Reduce类、Combiner类、输出格式、输入输出文件等 4、异步提交Job实时监控作业并打印Job状态 5、根据用户身份和权限构建Cluster并向集群提交Job 6、检查Job的输入和输出规格 7、计算Job的InputSplit格式输入文件路径开始偏移量默认分片数量所有输入文件对应的块的数量且每个分片对应一个Mapper 8、如有必要请为Job的DistributedCache设置必要的记帐信息 9、将Job的jar和配置复制到分布式文件系统上的map-reduce系统目录中 10、将作业提交到ResourceManager并可选择监视其状态