益阳建设企业网站,seo新手教程,网站建设板块免费下载,订票网站开发公司高效改进#xff01;防止DataX从HDFS导入关系型数据库丢数据
针对DataX在从HDFS导入数据到关系型数据库过程中的数据丢失问题#xff0c;优化了分片处理代码。改动包括将之前单一分片处理逻辑重构为循环处理所有分片#xff0c;确保了每个分片数据都得到全面读取和传输防止DataX从HDFS导入关系型数据库丢数据
针对DataX在从HDFS导入数据到关系型数据库过程中的数据丢失问题优化了分片处理代码。改动包括将之前单一分片处理逻辑重构为循环处理所有分片确保了每个分片数据都得到全面读取和传输有效提升了数据导入的可靠性和效率。这些改动不仅解决了丢数据的问题还显著提高了处理多分片数据的性能。
背景
我们数据中台设计数据同步功能是datax完成在orc格式时datax从hdfs导数据到关系型数据库数据丢失而在textfile格式时丢失数据当文件超过250M多时会丢数据。因想使用orc格式节省数据空间提高spark运行效率需要解决这个问题。
问题 只读取了256M 左右的数据数据条数对不上导致hdfsorc格式导入数据到pgmysql等关系型数据库数据丢失。
解决
修改hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java
问题代码 InputSplit[] splits in.getSplits(conf, 1);RecordReader reader in.getRecordReader(splits[0], conf, Reporter.NULL);Object key reader.createKey();Object value reader.createValue();// 获取列信息List? extends StructField fields inspector.getAllStructFieldRefs();ListObject recordFields;while (reader.next(key, value)) {recordFields new ArrayListObject();for (int i 0; i columnIndexMax; i) {Object field inspector.getStructFieldData(value, fields.get(i));recordFields.add(field);修改后 // OrcInputFormat getSplits params numSplits not used, splits size block numbersInputSplit[] splits in.getSplits(conf, -1);for (InputSplit split : splits) {{RecordReader reader in.getRecordReader(split, conf, Reporter.NULL);Object key reader.createKey();Object value reader.createValue();// 获取列信息List? extends StructField fields inspector.getAllStructFieldRefs();ListObject recordFields;while (reader.next(key, value)) {recordFields new ArrayListObject();for (int i 0; i columnIndexMax; i) {Object field inspector.getStructFieldData(value, fields.get(i));recordFields.add(field);}transportOneRecord(column, recordFields, recordSender,taskPluginCollector, isReadAllColumns, nullFormat);}reader.close();点击参考查看
重新打包替换hdfsreader.jar即可
解析 新增循环处理所有分片的逻辑 之前的代码只处理了第一个分片(splits[0])现在改为了处理所有的分片。新增的部分如下 java
InputSplit[] splits in.getSplits(conf, -1);
for (InputSplit split : splits) {RecordReader reader in.getRecordReader(split, conf, Reporter.NULL);Object key reader.createKey();Object value reader.createValue();旧的逻辑是 java
InputSplit[] splits in.getSplits(conf, 1);
RecordReader reader in.getRecordReader(splits[0], conf, Reporter.NULL);
Object key reader.createKey();
Object value reader.createValue();这样改动的目的是同时处理多个分片从而提升数据读取的效率。 移除了重复的分片处理逻辑 不使用重复的分片处理逻辑 java
// OrcInputFormat getSplits params numSplits not used, splits size block numbers
InputSplit[] splits in.getSplits(conf, -1);代码块的重构 将读取分片、解析记录以及处理记录的逻辑放入一个循环中使代码更简洁、更易读 改之前 java
InputSplit[] splits in.getSplits(conf, 1);
RecordReader reader in.getRecordReader(splits[0], conf, Reporter.NULL);
Object key reader.createKey();
Object value reader.createValue();改后使用循环 java
InputSplit[] splits in.getSplits(conf, -1);
for (InputSplit split : splits) {RecordReader reader in.getRecordReader(split, conf, Reporter.NULL);Object key reader.createKey();Object value reader.createValue();处理每个记录字段并传输记录 保持对每条记录的字段读取并将其传输转移到了新的循环处理逻辑中 改之前 while (reader.next(key, value)) {recordFields new ArrayListObject();for (int i 0; i columnIndexMax; i) {Object field inspector.getStructFieldData(value, fields.get(i));recordFields.add(field);}transportOneRecord(column, recordFields, recordSender,taskPluginCollector, isReadAllColumns, nullFormat);
}
reader.close();改后 for (InputSplit split : splits) {RecordReader reader in.getRecordReader(split, conf, Reporter.NULL);Object key reader.createKey();Object value reader.createValue();List? extends StructField fields inspector.getAllStructFieldRefs();ListObject recordFields;while (reader.next(key, value)) {recordFields new ArrayListObject();for (int i 0; i columnIndexMax; i) {Object field inspector.getStructFieldData(value, fields.get(i));recordFields.add(field);}transportOneRecord(column, recordFields, recordSender,taskPluginCollector, isReadAllColumns, nullFormat);}reader.close();
}为什么是256M没有更改前他是按每个文件进行分割而在datax的配置中Java heap size 即默认xmx设置时256M所以当单个文件超过256M时超过的部分就被丢掉了造成数据缺失而更改后的是按hdfs block size 块的大小进行分割循环遍历所以直接修改xmx也能解决问题但是你要想万一文件超过128G那你不可能一直调大Java heap size所以按hdfs block size分割是合理的解决方案
reader单个分片InputSplit的大小
在DataX的数据读取过程中reader单个分片InputSplit的大小通常取决于底层存储系统和具体的配置参数。对于HDFSHadoop Distributed File System)를的读取分片大小主要由以下几个因素决定
HDFS块大小Block Size HDFS将文件分为多个块每个块通常是64MB、128MB或256MB大小具体大小可以通过HDFS的配置参数dfs.blocksize进行设置。DataX会根据这些块来创建分片也就是一个分片通常对应一个或多个HDFS块。文件本身的大小 如果文件比HDFS块小或者没有跨越多个块则一个文件可能只对应一个分片。DataX的任务配置 DataX允许在其配置文件中指定一些与分片相关的参数类似于Hadoop的mapreduce.input.fileinputformat.split.maxsize和mapreduce.input.fileinputformat.split.minsize这些参数可以影响分片的逻辑。InputFormat DataX使用的Hadoop的InputFormat也能控制分片的逻辑比如FileInputFormat、TextInputFormat和OrcInputFormat等。这些格式定义了如何分割输入数据结合文件大小和块大小来决定分片。
总结
主要改动是将之前只处理单个分片的逻辑重构为一个循环处理所有分片。这使代码更具扩展性和效率也适应不同的输入数据量。移除了无用且重复的注释和代码行以保持代码清晰。