无锡市做企业网站的,网站防恶意注册,仪征市建设工程网站,合肥网站建设套餐面试题#xff1a;cache后面能不能接其他算子#xff0c;它是不是action操作#xff1f;
能#xff0c;不是action算子。
源码解析
RDD调用cache或persist之后#xff0c;会指定RDD的缓存级别#xff0c;但只是在成员变量中记录了RDD的存储级别#xff0c;并未真正地…面试题cache后面能不能接其他算子它是不是action操作
能不是action算子。
源码解析
RDD调用cache或persist之后会指定RDD的缓存级别但只是在成员变量中记录了RDD的存储级别并未真正地对RDD进行缓存。只有当RDD计算的时候才会对RDD进行缓存。
以HadoopRDD为例 override def compute(split: Partition, context: TaskContext): Iterator[U] {val partition split.asInstanceOf[HadoopPartition]val inputSplit partition.inputSplit.valuef(inputSplit, firstParent[T].iterator(split, context))}
调用的iterator方法 /*** Internal method to this RDD; will read from cache if applicable, or otherwise compute it.* This should not be called by users directly, but is available for implementors of custom* subclasses of RDD.*/final def iterator(split: Partition, context: TaskContext): Iterator[T] {if (storageLevel ! StorageLevel.NONE) {getOrCompute(split, context)} else {computeOrReadCheckpoint(split, context)}}
继续看 getOrCompute方法这里可以看到blockId的生成规则可以确定block和partition是一一对应的。
DeveloperApi
case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {override def name: String rdd_ rddId _ splitIndex
}
在executor端调用SparkEnv.get.blockManager.getOrElseUpdate()方法 /*** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached.*/private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] {val blockId RDDBlockId(id, partition.index)var readCachedBlock true// This method is called on executors, so we need call SparkEnv.get instead of sc.env.SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () {readCachedBlock falsecomputeOrReadCheckpoint(partition, context)}) match {case Left(blockResult) if (readCachedBlock) {val existingMetrics context.taskMetrics().inputMetricsexistingMetrics.incBytesRead(blockResult.bytes)new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {override def next(): T {existingMetrics.incRecordsRead(1)delegate.next()}}} else {new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])}case Right(iter) new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])}}
再看BlockManager中的getOrElseUpdate方法用来缓存数据的 /*** Retrieve the given block if it exists, otherwise call the provided makeIterator method* to compute the block, persist it, and return its values.** return either a BlockResult if the block was successfully cached, or an iterator if the block* could not be cached.*/def getOrElseUpdate[T](blockId: BlockId,level: StorageLevel,classTag: ClassTag[T],makeIterator: () Iterator[T]): Either[BlockResult, Iterator[T]] {// Attempt to read the block from local or remote storage. If its present, then we dont need// to go through the local-get-or-put path.get[T](blockId)(classTag) match {case Some(block) return Left(block)case _ // Need to compute the block.}// Initially we hold no locks on this block.doPutIterator(blockId, makeIterator, level, classTag, keepReadLock true) match {case None // doPut() didnt hand work back to us, so the block already existed or was successfully// stored. Therefore, we now hold a read lock on the block.val blockResult getLocalValues(blockId).getOrElse {// Since we held a read lock between the doPut() and get() calls, the block should not// have been evicted, so get() not returning the block indicates some internal error.releaseLock(blockId)throw new SparkException(sget() failed for block $blockId even though we held a lock)}// We already hold a read lock on the block from the doPut() call and getLocalValues()// acquires the lock again, so we need to call releaseLock() here so that the net number// of lock acquisitions is 1 (since the caller will only call release() once).releaseLock(blockId)Left(blockResult)case Some(iter) // The put failed, likely because the data was too large to fit in memory and could not be// dropped to disk. Therefore, we need to pass the input iterator back to the caller so// that they can decide what to do with the values (e.g. process them without caching).Right(iter)}}