怎么做软文链接打开后是自定义网站,三门网站制作,站长工具免费,做视频上传多少个网站两种BaseBatchReader的实现类 BaseBatchReader支持以Batch Vectorized的特性#xff0c;读取底层的文件。 ColumnarBatchReader 通过VectorizedSparkParquetReaders::build Reader()静态方法创建的读取器#xff0c;关键特性如下#xff1a; 支持读取Delete File以Arrow的格…两种BaseBatchReader的实现类 BaseBatchReader支持以Batch Vectorized的特性读取底层的文件。 ColumnarBatchReader 通过VectorizedSparkParquetReaders::build Reader()静态方法创建的读取器关键特性如下 支持读取Delete File以Arrow的格式直接读取Parquet文件最终返回的数据集的类型为Spark.ColumnarBatch是Spark中的实现类 public static ColumnarBatchReader buildReader(Schema expectedSchema,MessageType fileSchema,MapInteger, ? idToConstant,DeleteFilterInternalRow deleteFilter) {return (ColumnarBatchReader)TypeWithSchemaVisitor.visit(expectedSchema.asStruct(),fileSchema,new ReaderBuilder(expectedSchema,fileSchema,NullCheckingForGet.NULL_CHECKING_ENABLED,idToConstant,ColumnarBatchReader::new,deleteFilter));ArrowBatchReader 通过ArrowReader::buildReader()静态方法创建的读取器关键特性如下 不支持读取Delete File以Arrow的格式直接读取Parquet文件返回的最终结果为ColumnarBatch类型是Iceberg内置的实现类 在Iceberg 1.2.x的版本中只在测试用例中使用到因此在这里不再讨论它的实现比ColumnarBatchReader更简单。 ColumnarBatchReader的创建
DataSourceRDD::compute方法中创建PartitionReader实例
// 在计算RDD数据的过程中会通过如下的方法创建一个实现了PartitionReader接口的具体类的实例
// 这里partitionReaderFactory的类型为SparkColumnarReaderFactory
// SparkColumnarReaderFactory类是Iceberg中的实现它重写了createColumnarReader(InputPartition)接口
// 以返回一个PartitionReaderColumnarBatch的实例。
val batchReader partitionReaderFactory.createColumnarReader(inputPartition)PartitionReaderFactory.createColumnarReader方法创建BatchDataReader实例
class SparkColumnarReaderFactory implements PartitionReaderFactory {public PartitionReaderColumnarBatch createColumnarReader(InputPartition inputPartition) {SparkInputPartition partition (SparkInputPartition) inputPartition;if (partition.allTasksOfType(FileScanTask.class)) {return new BatchDataReader(partition, batchSize);} else {throw new UnsupportedOperationException(Unsupported task group for columnar reads: partition.taskGroup());}}
}BatchDataReader::open方法创建VectorizedParquetReader迭代器
BatchDataReader::open
class BatchDataReader extends BaseBatchReaderFileScanTaskimplements PartitionReaderColumnarBatch {Overrideprotected CloseableIteratorColumnarBatch open(FileScanTask task) {// 获取Data File的路径String filePath task.file().path().toString();LOG.debug(Opening data file {}, filePath);// update the current file for Sparks filename() functionInputFileBlockHolder.set(filePath, task.start(), task.length());MapInteger, ? idToConstant constantsMap(task, expectedSchema());// 获取底层文件的句柄InputFile inputFile getInputFile(filePath);Preconditions.checkNotNull(inputFile, Could not find InputFile associated with FileScanTask);// 获取数据文件对应的Delete FilesSparkDeleteFilter deleteFilter task.deletes().isEmpty()? null: new SparkDeleteFilter(filePath, task.deletes(), counter());// 返回一个数据文件上的迭代器return newBatchIterable(inputFile,task.file().format(),task.start(),task.length(),task.residual(),idToConstant,deleteFilter).iterator();}
}BaseBatchReader::newBatchIterable方法创建VectorizedParquetReader实例 VectorizedParquetReader类是最上层的类它提供了对遍历文件内容的入口。 abstract class BaseBatchReaderT extends ScanTask extends BaseReaderColumnarBatch, T {protected CloseableIterableColumnarBatch newBatchIterable(InputFile inputFile,FileFormat format,long start,long length,Expression residual,MapInteger, ? idToConstant,SparkDeleteFilter deleteFilter) {switch (format) {case PARQUET:// 如果文件的格式是PARQUET则创建一个Parquet上的迭代器return newParquetIterable(inputFile, start, length, residual, idToConstant, deleteFilter);case ORC:// 忽略不讨论return newOrcIterable(inputFile, start, length, residual, idToConstant);default:throw new UnsupportedOperationException(Format: format not supported for batched reads);}}private CloseableIterableColumnarBatch newParquetIterable(InputFile inputFile,long start,long length,Expression residual,MapInteger, ? idToConstant,SparkDeleteFilter deleteFilter) {// get required schema if there are deletesSchema requiredSchema deleteFilter ! null ? deleteFilter.requiredSchema() : expectedSchema();return Parquet.read(inputFile).project(requiredSchema).split(start, length)// 指定可以创建BaseBatchReader的实现类的实例的方法.createBatchedReaderFunc(fileSchema -VectorizedSparkParquetReaders.buildReader(requiredSchema, fileSchema, idToConstant, deleteFilter)).recordsPerBatch(batchSize).filter(residual).caseSensitive(caseSensitive())// Spark eagerly consumes the batches. So the underlying memory allocated could be reused// without worrying about subsequent reads clobbering over each other. This improves// read performance as every batch read doesnt have to pay the cost of allocating memory..reuseContainers().withNameMapping(nameMapping()).build();}
}ColumnarBatchReader::new方法创建ColumnarBatchReader实例 VectorizedSparkParquetReaders.buildReader()方法见第一大章节的简述。 public class ColumnarBatchReader extends BaseBatchReaderColumnarBatch {private final boolean hasIsDeletedColumn;private DeleteFilterInternalRow deletes null;private long rowStartPosInBatch 0;// 只有一个构造器readers是保存了读取文件中每一个列字段的Reader它们都是实现了VectorizedReaderT接口的// VectorizedArrowReaderT的实例public ColumnarBatchReader(ListVectorizedReader? readers) {super(readers);// 遍历每一个字段的Reader类型看看当前文件中是不是存在内置的列_deleted它标识着当前当前行是不是被删除了。this.hasIsDeletedColumn readers.stream().anyMatch(reader - reader instanceof DeletedVectorReader);}
}Parquet文件读取 通过前面的分析知道对上层Spark RDD可见的接口是由VectorizedParquetReader一个Iterator的实现类提供的 它内部封装了对ColumnarBatchReader的操作。 VectorizedParquetReader::iterator方法返回Parquet文件上的迭代器
public class VectorizedParquetReaderT extends CloseableGroup implements CloseableIterableT {Overridepublic CloseableIteratorT iterator() {FileIteratorT iter new FileIterator(init());addCloseable(iter);return iter;}
}FileIterator::next方法读取数据 由于FilterIterator实现了JAVA中的Iterator接口因此可以在compute Spark RDD时通过这个迭代器获取到文件中的内容 也就是next()方法返回的ColumnarBatch对象。 /*** 这里T的类型为ColumnarBatch。*/private static class FileIteratorT implements CloseableIteratorT {public T next() {if (!hasNext()) {throw new NoSuchElementException();}if (valuesRead nextRowGroupStart) {// 第一次执行时valuesRead nextRowGroupStart表示开始读取一个新的RowGroup// 这里调用advance()后nextRowGroupStart指向了下一个要读取的RowGroup的起始位置// 但当前的RowGroup是还没有被读取的被延迟到了后面的过程。advance();}// batchSize is an integer, so casting to integer is safe// 读取当前RowGroup的数据其中// nextRowGroupStart指向的是下一个RowGroup的起始位置// valuesRead的值表示一共读取了多少行// 这里必须有nextRowGroupStart nextRowGroupStart而它们的差值就是当前RowGroup剩余的没有被读取的行int numValuesToRead (int) Math.min(nextRowGroupStart - valuesRead, batchSize);// 读取指定数量的行这里的model就是前面提到的ColumnarBatchReader的实例对象。if (reuseContainers) {this.last model.read(last, numValuesToRead);} else {this.last model.read(null, numValuesToRead);}// 累加读取的行数valuesRead numValuesToRead;return last;}/*** 移动读取指针到下一个RowGroup的起始位置。*/private void advance() {while (shouldSkip[nextRowGroup]) {nextRowGroup 1;reader.skipNextRowGroup();}PageReadStore pages;try {pages reader.readNextRowGroup();} catch (IOException e) {throw new RuntimeIOException(e);}// 从绑定的RowGroups信息中计算下一个RowGroup的起始位置long rowPosition rowGroupsStartRowPos[nextRowGroup];model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup), rowPosition);nextRowGroupStart pages.getRowCount();nextRowGroup 1;}}ColumnarBatchReader::read
public class ColumnarBatchReader extends BaseBatchReaderColumnarBatch {protected final VectorHolder[] vectorHolders;Overridepublic final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) {if (reuse null) {// 如果指定了不复用当前的VectorHolder来存储数据时就关闭它们closeVectors();}// 由内部类ColumnBatchLoader负责代理进行真正的读取操作。ColumnarBatch columnarBatch new ColumnBatchLoader(numRowsToRead).loadDataToColumnBatch();rowStartPosInBatch numRowsToRead;return columnarBatch;}
}ColumnBatchLoader::loadDataToColumnBatch读取数据封装成ColumnarBatch对象 private class ColumnBatchLoader {// 读取的数据记录总数private final int numRowsToRead;// the rowId mapping to skip deleted rows for all column vectors inside a batch, it is null when// there is no deletesprivate int[] rowIdMapping;// the array to indicate if a row is deleted or not, it is null when there is no _deleted// metadata columnprivate boolean[] isDeleted;ColumnBatchLoader(int numRowsToRead) {Preconditions.checkArgument(numRowsToRead 0, Invalid number of rows to read: %s, numRowsToRead);this.numRowsToRead numRowsToRead;if (hasIsDeletedColumn) {isDeleted new boolean[numRowsToRead];}}ColumnarBatch loadDataToColumnBatch() {// 对读取的数据记录进行过滤得到未删除的数据记录总数int numRowsUndeleted initRowIdMapping();// 以Arrows格式读取每一列的数据表示为Spark.ColumnVector类型ColumnVector[] arrowColumnVectors readDataToColumnVectors();// 创建一个ColumnarBatch实例包含所有存活的数据ColumnarBatch newColumnarBatch new ColumnarBatch(arrowColumnVectors);newColumnarBatch.setNumRows(numRowsUndeleted);if (hasEqDeletes()) {// 如果有等值删除的文件存在则还需要按值来过滤掉被删除的数据行// 由于基于等值删除的文件过滤数据时需要知道每一行的实际值因此只有将数据读取到内存中才知道哪一行要被删除掉applyEqDelete(newColumnarBatch);}if (hasIsDeletedColumn rowIdMapping ! null) {// 如果存在被删除的数据行则需要重新分配行号从0开始自然递增// reset the row id mapping array, so that it doesnt filter out the deleted rowsfor (int i 0; i numRowsToRead; i) {rowIdMapping[i] i;}newColumnarBatch.setNumRows(numRowsToRead);}// 返回return newColumnarBatch;}ColumnVector[] readDataToColumnVectors() {ColumnVector[] arrowColumnVectors new ColumnVector[readers.length];ColumnVectorBuilder columnVectorBuilder new ColumnVectorBuilder();for (int i 0; i readers.length; i 1) {vectorHolders[i] readers[i].read(vectorHolders[i], numRowsToRead);int numRowsInVector vectorHolders[i].numValues();Preconditions.checkState(numRowsInVector numRowsToRead,Number of rows in the vector %s didnt match expected %s ,numRowsInVector,numRowsToRead);arrowColumnVectors[i] columnVectorBuilder.withDeletedRows(rowIdMapping, isDeleted).build(vectorHolders[i], numRowsInVector);}return arrowColumnVectors;}boolean hasEqDeletes() {return deletes ! null deletes.hasEqDeletes();}int initRowIdMapping() {Pairint[], Integer posDeleteRowIdMapping posDelRowIdMapping();if (posDeleteRowIdMapping ! null) {rowIdMapping posDeleteRowIdMapping.first();return posDeleteRowIdMapping.second();} else {rowIdMapping initEqDeleteRowIdMapping();return numRowsToRead;}}/*** 如果当前文件包含 positions delete files那么需要建立索引数据结构*/Pairint[], Integer posDelRowIdMapping() {if (deletes ! null deletes.hasPosDeletes()) {return buildPosDelRowIdMapping(deletes.deletedRowPositions());} else {return null;}}/*** Build a row id mapping inside a batch, which skips deleted rows. Here is an example of how we* delete 2 rows in a batch with 8 rows in total. [0,1,2,3,4,5,6,7] -- Original status of the* row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted array Position* delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records to 6]* [F,F,T,F,F,F,T,F] -- After applying position deletes** param deletedRowPositions a set of deleted row positions* return the mapping array and the new num of rows in a batch, null if no row is deleted*/Pairint[], Integer buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPositions) {if (deletedRowPositions null) {return null;}// 为新读取的数据记录创建一个数组保存所有没有被删除的行号从0开始// 基本算法使用双指针将所有未删除的行放到队列一端且有序int[] posDelRowIdMapping new int[numRowsToRead];int originalRowId 0; // 指向待判定的行的下标int currentRowId 0; // 存活行的下标while (originalRowId numRowsToRead) {if (!deletedRowPositions.isDeleted(originalRowId rowStartPosInBatch)) {// 如果当前行没有被删除则将其添加到currentRowId指向的位置posDelRowIdMapping[currentRowId] originalRowId;// currentRowId指向下一个待插入的位置 currentRowId;} else {if (hasIsDeletedColumn) {isDeleted[originalRowId] true;}deletes.incrementDeleteCount();}originalRowId;}if (currentRowId numRowsToRead) {// there is no delete in this batchreturn null;} else {return Pair.of(posDelRowIdMapping, currentRowId);}}int[] initEqDeleteRowIdMapping() {int[] eqDeleteRowIdMapping null;if (hasEqDeletes()) {eqDeleteRowIdMapping new int[numRowsToRead];for (int i 0; i numRowsToRead; i) {eqDeleteRowIdMapping[i] i;}}return eqDeleteRowIdMapping;}/*** Filter out the equality deleted rows. Here is an example, [0,1,2,3,4,5,6,7] -- Original* status of the row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted* array Position delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num* records to 6] [F,F,T,F,F,F,T,F] -- After applying position deletes Equality delete 1 x * 3 [0,4,5,7,-,-,-,-] -- After applying equality deletes [Set Num records to 4]* [F,T,T,T,F,F,T,F] -- After applying equality deletes** param columnarBatch the {link ColumnarBatch} to apply the equality delete*/void applyEqDelete(ColumnarBatch columnarBatch) {// 对经过position deletes 过滤的数据行进行按值删除IteratorInternalRow it columnarBatch.rowIterator();int rowId 0;int currentRowId 0;while (it.hasNext()) { // 行式遍历InternalRow row it.next();if (deletes.eqDeletedRowFilter().test(row)) {// the row is NOT deleted// skip deleted rows by pointing to the next undeleted row Id// 更新成员变量rowIdMappingrowIdMapping[currentRowId] rowIdMapping[rowId];currentRowId;} else {if (hasIsDeletedColumn) {isDeleted[rowIdMapping[rowId]] true;}deletes.incrementDeleteCount();}rowId;}// 更新最新的存活记录数columnarBatch.setNumRows(currentRowId);}}