电话用网站做综合布线,宁波做网站建设推广,目前h5页面的制作工具有哪些,团购酒店的网站建设Hudi之数据读写深入探究
1. Hudi数据写入
1-1. 写操作
Hudi数据湖中的数据更新、插入和删除操作#xff0c;是一个基于Apache Hadoop的库#xff0c;为数据湖提供了一种有效的方法来处理更新和增量数据#xff0c;并支持基于时间的快照和增量数据处理。Hudi支持三种主要的…Hudi之数据读写深入探究
1. Hudi数据写入
1-1. 写操作
Hudi数据湖中的数据更新、插入和删除操作是一个基于Apache Hadoop的库为数据湖提供了一种有效的方法来处理更新和增量数据并支持基于时间的快照和增量数据处理。Hudi支持三种主要的数据操作模式UPSERT更新或插入、INSERT插入、BULK_INSERT批量插入。
1-1-1. UPSERT更新或插入
UPSERT是指将数据更新到现有数据集中写入数据时检查数据是否已存在如果数据不存在则插入新数据这涉及到数据的读取、对比和合并。这种操作模式适用于需要实时更新或修复数据的场景。使用Hudi的UPSERT功能可以轻松地将新数据合并到现有数据集中并在必要时更新已存在的记录。这种方式确保了数据集的一致性和准确性。
性能特点
数据读取和对比 为了确定哪些记录需要更新Hudi必须读取现有数据文件。这会导致额外的I/O开销。索引维护 Hudi使用索引如布隆过滤器或哈希索引来快速查找现有记录减少数据读取时间但索引维护本身也增加了开销。合并过程 数据合并需要计算和重写数据文件这增加了CPU和I/O负载。
索引处理
索引查找 UPSERT操作依赖索引来快速查找现有记录以确定哪些记录需要更新。Hudi通常使用布隆过滤器或哈希索引来加速这一过程。索引维护 在数据更新和插入后Hudi还需要维护这些索引以确保后续查询和写入操作的效率。
适用场景 适用于需要频繁更新数据的场景如实时数据流处理、在线交易系统等。
1-1-2. INSERT插入
INSERT操作用于将新数据插入到数据湖中而不进行任何更新。这种模式适用于将新数据加载到数据湖中而无需考虑现有数据集的变化。使用Hudi的INSERT功能可以快速地将新数据加载到数据湖中并确保数据的完整性和可靠性。INSERT操作仅仅是将新数据写入到数据湖中所以无需检查或更新现有数据。是一个简单的追加操作。
性能特点
简化写入路径 不需要读取现有数据或进行对比直接写入新数据大大简化了写入路径。更低的I/O开销 由于没有数据读取和合并过程I/O开销显著降低。更高的吞吐量 因为写入过程简单INSERT操作通常能实现更高的吞吐量。
索引处理
无需索引查找 INSERT操作仅将新数据写入不需要查找现有记录。索引维护 虽然INSERT操作不需要查找索引但在数据写入后Hudi会更新索引以确保新数据可被后续查询和操作快速访问。
适用场景 适用于每次写入都是新数据的场景如批量数据加载、传感器数据收集等。
1-1-3. BULK_INSERT批量插入
BULK_INSERT操作类似于INSERT操作是优化的INSERT操作。可以一次性插入大量数据从而提高数据加载的效率。这种模式适用于需要高性能地加载大批量数据的场景。Hudi的BULK_INSERT功能可以有效地处理大规模数据加载任务并确保数据加载的速度和稳定性。
性能特点
优化的文件写入 BULK_INSERT通过优化数据文件的分配和组织方式最大化写入效率。并行写入 利用并行处理技术将大量数据拆分成多个部分并行写入极大地提高了写入速度。最小化索引开销 BULK_INSERT在写入过程中减少了索引维护的负担进一步提升了性能。
索引处理
无需索引查找 BULK_INSERT与INSERT类似不需要查找现有记录。延迟索引维护 为了优化批量写入性能BULK_INSERT通常会延迟索引更新或采用批量更新的方式减少索引维护的频率和开销。
适用场景 适用于需要高效地加载大量数据的场景如数据湖初始化、大规模数据迁移、数据仓库批量导入等。
1-1-4. 写操作性能对比总结
操作类型I/O开销CPU开销写入吞吐量磁盘空间利用内存消耗并发处理能力网络开销INSERT-写入最低几乎无需计算吞吐量较高低低一般低BULK_INSERT-批量写需要更多I/O有一些计算需求吞吐量最高较低较低高低UPSERT-更新或写入最高需要较高的计算资源吞吐量最低高较高低高
2. Hudi数据写流程
2-1. 数据写入步骤 数据准备 当新数据到达时我们称之为增量数据首先需要将其转换成Hudi的内部格式。这些数据通常会包含两个重要的键 记录键Record Key 唯一标识每条记录。分区键Partition Key 决定数据应该放在哪个“分区”类似于文件夹。 scala spark.sql(SELECT _hoodie_record_key,_hoodie_partition_path,role_cn,tianfu FROM hudi_ro_view).show()
-------------------------------------------------------------------
|_hoodie_record_key|_hoodie_partition_path| role_cn| tianfu|
-------------------------------------------------------------------
| 4| zhuangbei皮甲| 盗贼| 狂徒|刺杀|敏锐|
| 5| zhuangbei皮甲| 武僧| 酒仙|踏风|织雾|
| 6| zhuangbei皮甲| 德鲁伊|恢复|平衡|野性|守护|
| 7| zhuangbei皮甲|恶魔猎手| 复仇|浩劫|
| 11| zhuangbei板甲|死亡骑士| 鲜血|冰霜|邪恶|
| 12| zhuangbei板甲| 战士| 武器|狂暴|防护|
| 13| zhuangbei板甲| 圣骑士| 神圣|防护|惩戒|
| 8| zhuangbei锁甲| 猎人| 兽王|生存|射击|
| 9| zhuangbei锁甲| 萨满| 恢复|增强|元素|
| 10| zhuangbei锁甲| 龙人| 湮灭|恩护|增辉|
| 1| zhuangbei布甲| 法师| 冰法|火法|奥法|
| 2| zhuangbei布甲| 牧师| 神牧|戒律|暗牧|
| 3| zhuangbei布甲| 术士| 毁灭|痛苦|恶魔|
-------------------------------------------------------------------分区划分 Hudi会根据分区键来决定每条记录属于哪个分区。这一步确保数据按照分区键被组织好使得后续处理更加高效。 建立索引 Hudi使用索引来加速查找记录。常见的索引类型有 布隆过滤器Bloom Filter 快速检查某条记录是否可能在某个文件中。哈希索引Hash Index 在内存中快速定位记录键。 在执行UPSERT操作时索引用于确定哪些新记录需要更新已有记录哪些是全新的记录。 查找匹配记录 Hudi利用索引快速找到与新数据记录键匹配的旧数据文件。这一步骤大大减少了需要扫描的数据量从而提高处理速度。 读取现有数据 找到匹配的旧记录后Hudi会读取这些旧记录准备与新数据进行合并。 数据合并 Hudi将新数据与旧数据合并。这一过程可以自定义但通常包括以下情况 更新现有记录 用新数据中的信息更新旧记录。插入新记录 如果没有找到匹配的旧记录则直接插入新记录。 写入新的数据文件 合并后的数据会被写入新的数据文件中。这些文件通常按照分区组织以便于后续查询和管理。 更新索引 新数据文件写入后Hudi会更新索引反映最新的记录位置和状态。这确保未来的UPSERT和查询操作能够高效进行。 提交事务 Hudi将每次UPSERT操作视为一个事务。只有当所有步骤成功完成后Hudi才会提交事务确保数据一致性。如果有任何一步失败Hudi会回滚操作以保证数据不会出错。 清理和优化 最后Hudi会执行一些维护工作保持系统的高效运行 清理旧数据文件 删除不再需要的旧文件释放存储空间。文件压缩和合并 为了提高存储和查询性能Hudi可能会将多个小文件合并成一个大文件减少碎片。
2-2. Copy On Write (COW)
Copy On Write (COW) 这是一种写入数据的技术它不直接在原始数据上进行修改而是在写入新数据时创建副本Copy然后在副本上进行修改Write。
工作原理
数据准备和索引查找 和标准UPSERT流程一致首先准备新数据并通过索引查找现有数据文件中需要更新的记录。数据合并 读取需要更新的现有数据文件将新数据和旧数据进行合并。写入新文件 合并后的数据被写入新的数据文件旧的数据文件被标记为无效或删除。索引更新 更新索引以反映新数据文件的位置和状态。事务提交和清理 提交事务并清理旧数据文件。
性能特征
写入开销 写入性能相对较低因为每次更新都需要重写整个数据文件。查询性能 查询性能较好因为每个数据文件都是完整的无需额外的合并操作。存储效率 存储效率相对较低尤其是在频繁更新的场景下因为每次更新都会产生大量的新文件和旧文件碎片。
适用场景 适用于读操作频繁而写操作相对较少的场景例如批量数据加载、定期数据更新等。
Copy On Write模式的UPSERT
1首先对记录按照记录键record key进行去重确保每个键值只出现一次。
2创建索引HoodieKey HoodieRecordLocation这个索引用于区分哪些记录需要更新哪些需要插入。通过这个索引可以快速定位到记录的位置和状态。
3对于需要更新的记录找到其对应的最新文件FileSlice的基本文件base file将更新后的数据与原始数据进行合并merge然后写入新的基本文件新的FileSlice。这个步骤确保了数据的一致性和持久性。
4对于需要插入的记录会扫描当前分区的所有小文件SmallFile即小于一定大小的基本文件然后将新记录与这些小文件进行合并生成新的FileSlice。如果当前分区没有小文件则直接创建新的文件组FileGroup和FileSlice来存储插入的记录。
2-3. Merge On Read (MOR)
Merge On Read (MOR)这是一种写入数据的技术它在读取数据时进行合并而不是在写入时立即合并。
工作原理
数据准备和索引查找 和标准UPSERT流程一致首先准备新数据并通过索引查找现有数据文件中需要更新的记录。数据写入增量文件 新数据和更新数据被写入增量文件delta files而不是重写整个数据文件。写入基准文件 基准数据文件base files保持不变仅当数据文件达到一定条件时才会进行合并。合并操作 查询时Hudi会将增量文件和基准文件合并on-read merge以提供最新的数据视图。索引更新 更新索引以反映增量文件的位置和状态。事务提交和清理 提交事务并根据配置策略定期合并和清理增量文件和基准文件。
性能特征
写入开销 写入性能较高因为新数据只需写入增量文件无需重写整个数据文件。查询性能 查询性能相对较低因为需要在读取时进行合并操作尤其是在增量文件较多时。存储效率 存储效率较高因为更新操作不会产生大量的重写文件但需要定期合并增量文件以减少碎片。
适用场景 适用于写操作频繁、实时数据更新的场景例如实时数据流处理、事件驱动系统等。
Merge On Read模式的UPSERT
1首先对记录按照记录键record key进行去重可选确保每个键值只出现一次。
2创建索引HoodieKey HoodieRecordLocation这个索引用于区分哪些记录需要更新哪些需要插入。通过这个索引可以快速定位到记录的位置和状态。
3对于需要插入的记录
如果日志文件log file不可建立索引默认情况系统会尝试合并分区内最小的基本文件不包含日志文件的FileSlice生成新的FileSlice。如果没有基本文件则创建新的文件组FileGroup、FileSlice和基本文件base file。如果日志文件可建立索引则尝试追加小的日志文件。如果没有小的日志文件则新建一个FileGroup、FileSlice和基本文件。
4对于需要更新的记录
写入对应的文件组FileGroup和文件片段FileSlice直接追加最新的日志文件。如果最新的日志文件是当前最小的小文件则会合并基本文件生成新的文件片段。
5当日志文件的大小达到一定阈值时会创建一个新的日志文件roll over。 2-4. COW vs MOR 对比表
维度Copy On Write (COW)Merge On Read (MOR)写入性能中等每次写入都需要重写整个文件因此写入开销较大。高新数据写入增量文件无需重写基准文件因此写入速度快。查询性能高查询时数据文件是完整的不需要额外的合并操作读取速度快。中等到低查询时需要合并增量文件和基准文件尤其是增量文件较多时查询速度可能较慢。存储效率中等频繁重写文件可能导致存储碎片化增加存储需求。高通过增量文件和基准文件的方式减少了重写文件的开销存储效率更高。适用场景读操作频繁写操作较少适用于批量数据加载、定期更新等场景。写操作频繁需实时数据更新适用于实时数据流处理、事件驱动系统等场景。实现复杂度较低操作相对简单不需要处理增量文件和基准文件的合并。较高需要处理增量文件和基准文件的合并逻辑较为复杂。资源消耗I/O和存储开销较高每次写入都涉及重写整个文件导致较高的I/O和存储消耗。I/O消耗较低写入只需写增量文件但查询时的合并操作可能增加计算负担。数据一致性高数据文件是完整的无需额外处理即可保证一致性。高读取时需要合并操作才能获得一致的数据视图但通过事务保证一致性。延迟写入延迟较高需要重写文件导致写入操作延迟增加。写入延迟较低增量文件写入速度快延迟较低。合并频率不需要频繁合并文件生成后即为最终文件。需要定期合并增量文件和基准文件的合并需要定期进行以优化查询性能。数据恢复较简单数据文件直接反映最新数据恢复操作较简单。较复杂需要合并增量文件和基准文件恢复操作更复杂。文件管理较简单主要关注删除旧文件和清理碎片。较复杂需要处理增量文件和基准文件的关系。实时分析支持有限适合批处理和定期分析。强大支持适合实时数据流处理和分析。存储空间可能较高由于频繁的文件重写存储需求较大。相对较低通过增量文件管理更高效减少存储需求。数据老化需要定期清理老旧文件需要手动清理。自动管理通过增量文件和基准文件的合并自动管理老旧数据。
2-5. HoodieKey
HoodieKey是Hudi中一个重要的概念HoodieKey就像是数据的身份证它帮助Hudi准确地管理和追踪数据的变更它是数据的唯一标识用于在数据湖中追踪和管理数据的变更。想象一下你有一个大仓库里面摆放着各种货物而每一件货物都有一个独特的编号这个编号就是HoodieKey。它告诉你这件货物的唯一身份。
现在这个仓库经常有新货物进来也有旧货物出去这就是数据的变更。当有新的货物进来时你需要确保它有一个唯一的编号这样你才能在仓库中找到它。而当有货物出去时你也需要记录下它的编号以便知道它是什么时候离开了仓库。
HoodieKey就像是每件货物的编号一样它帮助Hudi追踪和管理数据的变更。每条数据都有一个唯一的HoodieKey用于标识它的身份。当有新数据写入时Hudi会根据HoodieKey来确定它是新增还是更新从而正确地处理数据。同时Hudi还可以根据HoodieKey来进行数据的查询、合并和版本控制确保数据的一致性和完整性。
HUDI的Key生成策略通常包括以下几个方面
Record Key记录键每条记录都有一个唯一的Record Key用于在数据集中唯一标识一条记录。通常情况下Record Key由用户指定并且应该具有足够的唯一性以便HUDI可以根据Record Key来进行数据的插入、更新和删除操作。Partition Key分区键HUDI可以根据Partition Key将数据分割成不同的分区以便更高效地管理和查询数据。Partition Key通常根据业务需求来选择可以是日期、地理位置、业务类型等。HUDI根据Partition Key来组织数据存储使得可以针对不同的分区进行并行处理。FileID文件标识HUDI在存储数据时会将数据分成多个文件每个文件都有一个唯一的FileID用于标识。FileID通常由HUDI根据文件创建时间或其他规则生成确保每个文件都具有唯一性。Commit Time提交时间HUDI还可以根据数据的提交时间来生成Key以便对数据进行版本控制和追溯。通过Commit Time可以了解到每条数据的写入时间以及数据的变更历史。
综合利用Record Key、Partition Key、FileID和Commit Time等信息HUDI可以生成唯一的Key来标识和管理数据的变更从而实现对大规模数据变更的高效管理和处理。 3. Hudi的删除策略
3-1. 物理删除Physical Delete
物理删除直接从数据文件中移除记录不再保留这些数据。这种删除方式可以减少存储空间但可能影响查询性能特别是在需要频繁合并数据文件时。
工作原理
标记待删除记录首先Hudi将待删除的记录标记为删除状态。通过记录键Record Key识别待删除的记录。生成删除文件将标记删除的记录写入一个删除日志文件Delete Log File。数据文件合并在下一次Compaction合并操作时删除标记的记录将从数据文件中移除。
优缺点
优点可以减少存储空间最终只保留有效的数据。缺点需要频繁合并数据文件可能会影响写入和查询性能。
3-2. 逻辑删除Logical Delete
逻辑删除只是将记录标记为删除状态但不实际从数据文件中移除。这种方式通常通过添加一个删除标记字段来实现在查询时过滤掉这些标记为删除的记录。
工作原理
标记删除状态在待删除的记录上添加一个删除标记字段设置为true或其他表示删除的状态。查询过滤查询时通过过滤条件排除标记为删除的记录。保留数据实际数据依然保存在文件中只是在查询时不再返回这些记录。
优缺点
优点删除操作快速且不需要合并数据文件适用于需要保留历史数据的场景。缺点会占用额外的存储空间因为删除的数据实际上仍然存在。
4. Hudi读数据
4-1. 快照查询Snapshot Query
快照查询提供了一个数据集的最新视图包括所有已提交的插入、更新和删除操作。它是最常用的查询模式适用于需要读取最新完整数据的场景。
读取所有 partiiton 下每个 FileGroup 最新的 FileSlice 中的文件Copy On Write 表读 parquet 文件Merge On Read 表读 parquet log 文件
工作原理
数据版本管理Hudi通过时间戳管理数据版本快照查询只会读取最新提交的文件。合并操作在Merge On ReadMOR模式下快照查询会将基准文件Base Files和增量文件Delta Files合并确保返回的数据是最新的。查询执行快照查询直接读取最新的数据文件无需用户手动处理版本和合并。
示例
val hudiOptions Map(hoodie.datasource.query.type - snapshot
)val snapshotDF spark.read.format(hudi).options(hudiOptions).load(path_to_hudi_table)
snapshotDF.show()优缺点
优点简单易用直接获取最新数据。缺点在MOR模式下合并操作可能会增加查询延迟。
4-2. 增量查询Incremental Query
增量查询用于获取从指定时间点或提交开始的数据变更包括插入、更新和删除操作。适用于需要增量加载数据到其他系统或进行增量处理的场景。
当前的 Spark data source 可以指定消费的起始和结束 commit 时间读取 commit 增量的数据集。但是内部的实现不够高效拉取每个 commit 的全部目标文件再按照系统字段 hoodie_commit_time apply 过滤条件
工作原理
提交点Commit Point管理Hudi为每次写操作生成一个唯一的提交点标识增量查询可以基于这个提交点进行数据过滤。读取变更数据增量查询只读取从上一个提交点以来的变更数据包括新增、更新和删除的记录。
示例
val hudiOptions Map(hoodie.datasource.query.type - incremental,hoodie.datasource.read.begin.instanttime - 20230101000000,hoodie.datasource.read.end.instanttime - 20230102000000
)val incrementalDF spark.read.format(hudi).options(hudiOptions).load(path_to_hudi_table)
incrementalDF.show()优缺点
优点高效地获取数据变更适合实时数据同步和流处理。缺点需要维护提交点信息管理复杂度较高。
4-3. 流式查询Streaming Query
流式查询通过持续监控Hudi表的变更实现实时数据流处理。适用于需要实时响应数据变化的场景如实时分析、实时监控等。可用于同步 CDC 数据日常的数据同步 ETL pipeline。
工作原理
流处理引擎集成流式查询通常与流处理引擎如Apache Spark Structured Streaming集成自动处理数据变更。增量加载通过持续增量加载数据实时反映数据变化。
示例
import org.apache.spark.sql.streaming.Triggerval streamingDF spark.readStream.format(hudi).option(hoodie.datasource.query.type, incremental).load(path_to_hudi_table)val query streamingDF.writeStream.format(console).trigger(Trigger.ProcessingTime(10 seconds)).start()query.awaitTermination()优缺点
优点实时处理数据变更适用于实时数据处理和分析。缺点需要设置流处理环境管理复杂度较高。
4-4. 查询方式对比分析
维度Snapshot QueryIncremental QueryStreaming Query数据视图最新视图包含所有已提交的变更从指定提交点开始的增量变更持续的实时数据变更适用场景读取最新完整数据批处理和定期分析增量加载数据到其他系统增量处理实时分析、实时监控实现复杂度低直接读取最新数据中需要管理提交点信息高需要设置和管理流处理环境查询延迟可能较高特别是MOR模式较低仅读取变更数据极低实时反映数据变化资源消耗中等视数据量和合并操作复杂度而定较低仅处理变更数据较高持续处理实时数据数据一致性高返回最新提交的一致性数据高基于提交点的一致性数据高实时保证数据一致性性能优化通过索引和合并策略优化查询性能通过选择合适的提交点和过滤条件优化性能通过设置合适的触发器和流处理参数优化性能
5. 魔兽世界demo代码演示
现在有一个MySQL数据库存储了一张魔兽世界的中的职业信息表现在通过spark把MySQL的数据读取然后写入到hudi通过这个简单的demo示例体会整个过程 代码运行环境这里不过多介绍最基础的实验环境可以是spark单机hudi依赖MySQL连接器放入spark的jars目录中存储使用本地存储 5-1. 环境准备
MySQL信息
MariaDB [wow] desc wow_info;
---------------------------------------------------------------
| Field | Type | Null | Key | Default | Extra |
---------------------------------------------------------------
| id | int(11) | NO | PRI | NULL | auto_increment |
| role | varchar(255) | YES | | NULL | |
| role_cn | varchar(255) | YES | | NULL | |
| role_pinyin | varchar(255) | YES | | NULL | |
| zhuangbei | varchar(255) | YES | | NULL | |
| tianfu | varchar(255) | YES | | NULL | |
---------------------------------------------------------------
MariaDB [wow] select * from wow_info;
-----------------------------------------------------------------------------
| id | role | role_cn | role_pinyin | zhuangbei | tianfu |
-----------------------------------------------------------------------------
| 1 | fs | 法师 | fashi | 布甲 | 冰法|火法|奥法 |
| 2 | ms | 牧师 | mushi | 布甲 | 神牧|戒律|暗牧 |
| 3 | ss | 术士 | shushi | 布甲 | 毁灭|痛苦|恶魔 |
| 4 | dz | 盗贼 | daozei | 皮甲 | 狂徒|刺杀|敏锐 |
| 5 | ws | 武僧 | wuseng | 皮甲 | 酒仙|踏风|织雾 |
| 6 | xd | 德鲁伊 | xiaode | 皮甲 | 恢复|平衡|野性|守护 |
| 7 | dh | 恶魔猎手 | emolieshou | 皮甲 | 复仇|浩劫 |
| 8 | lr | 猎人 | lieren | 锁甲 | 兽王|生存|射击 |
| 9 | sm | 萨满 | saman | 锁甲 | 恢复|增强|元素 |
| 10 | long | 龙人 | longren | 锁甲 | 湮灭|恩护|增辉 |
| 11 | dk | 死亡骑士 | siwangqishi | 板甲 | 鲜血|冰霜|邪恶 |
| 12 | zs | 战士 | zhanshi | 板甲 | 武器|狂暴|防护 |
| 13 | sq | 圣骑士 | shengqi | 板甲 | 神圣|防护|惩戒 |
-----------------------------------------------------------------------------5-2. 代码开发
编写Scala任务
[rootwangting ~]# mkdir -p ~/hudi_test/
[rootwangting ~]# cd hudi_test/
[rootwangting hudi_test]# vim MySQLToHudi.scalaMySQLToHudi.scala内容
import org.apache.spark.sql.{SparkSession, DataFrame}object MySQLToHudi {def main(args: Array[String]): Unit {val spark SparkSession.builder().appName(MySQLToHudi).config(spark.serializer, org.apache.spark.serializer.KryoSerializer).getOrCreate()// 设置MySQL连接参数val mysqlOptions Map(url - jdbc:mysql://wangtingmysql:3306/wow,dbtable - wow_info,user - root,password - 123456)// 读取MySQL数据val mysqlDF: DataFrame spark.read.format(jdbc).options(mysqlOptions).load()// 定义Hudi表路径一般为hdfs路径val hudiTablePath file:///root/hudi_test/hudi_wow_info// 写入Hudi表mysqlDF.write.format(org.apache.hudi).option(hoodie.table.name, hudi_wow_info).option(hoodie.datasource.write.operation, bulk_insert).option(hoodie.datasource.write.recordkey.field, id).option(hoodie.datasource.write.precombine.field, id).option(hoodie.datasource.write.partitionpath.field, zhuangbei).option(hoodie.datasource.write.table.type, COPY_ON_WRITE).option(hoodie.datasource.write.hive_style_partitioning, true).option(hoodie.upsert.shuffle.parallelism, 4).option(hoodie.bulkinsert.shuffle.parallelism, 4).mode(overwrite).save(hudiTablePath)spark.stop()}
}代码中将存储文件写入到本地file:///root/hudi_test/hudi_wow_info环境便于测试查看 5-3. 上线运行
运行
spark-shell \--conf spark.serializerorg.apache.spark.serializer.KryoSerializer \--conf spark.sql.catalog.spark_catalogorg.apache.spark.sql.hudi.catalog.HoodieCatalog \--conf spark.sql.extensionsorg.apache.spark.sql.hudi.HoodieSparkSessionExtension \--packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.12.0,mysql:mysql-connector-java:5.1.37# 加载开发完毕的MySQLToHudi功能:load
scala :load /root/hudi_test/MySQLToHudi.scala
Loading /root/hudi_test/MySQLToHudi.scala...
import org.apache.spark.sql.{SparkSession, DataFrame}
defined object MySQLToHudi
# 执行main方法进行数据读写(对象名加main方法)
scala MySQLToHudi.main(Array())
498361 [Executor task launch worker for task 0.0 in stage 9.0 (TID 227)] WARN org.apache.hadoop.metrics2.impl.MetricsConfig执行完毕后查看存储文件
[rootwangting ~]# cd /root/hudi_test/hudi_wow_info
[rootwangting hudi_wow_info]# ll
total 16
drwxr-xr-x 2 root root 4096 May 28 16:36 zhuangbei布甲
drwxr-xr-x 2 root root 4096 May 28 16:36 zhuangbei板甲
drwxr-xr-x 2 root root 4096 May 28 16:36 zhuangbei皮甲
drwxr-xr-x 2 root root 4096 May 28 16:36 zhuangbei锁甲
[rootwangting hudi_wow_info]# ls zhuangbei布甲/
1a0d1d0d-123a-4148-bde4-3d19ecab9668-0_0-0-0_20240528163634383.parquet可以看到文件的存储格式 1a0d1d0d-123a-4148-bde4-3d19ecab9668-0_0-0-0_20240528163634383.parquet 1a0d1d0d-123a-4148-bde4-3d19ecab9668-0 - fileId 0-0-0 - writeToken 20240528163634383 - instantTime parquet - fileExtension 5-4. Hudi读取数据
Apache Spark 来读取 Hudi 表 读取数据
// 启动
spark-shell \--conf spark.serializerorg.apache.spark.serializer.KryoSerializer \--conf spark.sql.catalog.spark_catalogorg.apache.spark.sql.hudi.catalog.HoodieCatalog \--conf spark.sql.extensionsorg.apache.spark.sql.hudi.HoodieSparkSessionExtension \--packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.12.0,mysql:mysql-connector-java:5.1.37// 读取数据
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Rowval basePath file:///root/hudi_test/hudi_wow_info
val hoodieROViewDF: Dataset[Row] spark.read.format(org.apache.hudi).load(s$basePath/*)// 查询数据
hoodieROViewDF.createOrReplaceTempView(hudi_ro_view)
spark.sql(SELECT * FROM hudi_ro_view).show()命令行展示
[rootwangting jars]# spark-shell \--conf spark.serializerorg.apache.spark.serializer.KryoSerializer \--conf spark.sql.catalog.spark_catalogorg.apache.spark.sql.hudi.catalog.HoodieCatalog \--conf spark.sql.extensionsorg.apache.spark.sql.hudi.HoodieSparkSessionExtension \--packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.12.0,mysql:mysql-connector-java:5.1.37scala import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Datasetscala import org.apache.spark.sql.Row
import org.apache.spark.sql.Rowscala val basePath file:///root/hudi_test/hudi_wow_info
basePath: String file:///root/hudi_test/hudi_wow_infoscala val hoodieROViewDF: Dataset[Row] spark.read.format(org.apache.hudi).load(s$basePath/*)
119318 [main] WARN org.apache.hudi.common.config.DFSPropertiesConfiguration - Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
119328 [main] WARN org.apache.hudi.common.config.DFSPropertiesConfiguration - Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
hoodieROViewDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 9 more fields]scala hoodieROViewDF.createOrReplaceTempView(hudi_ro_view)scala spark.sql(SELECT * FROM hudi_ro_view).show()
---------------------------------------------------------------------------------------------------------------------------------------------------------
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| id|role| role_cn|role_pinyin| tianfu|zhuangbei|
---------------------------------------------------------------------------------------------------------------------------------------------------------
| 20240528163634383|20240528163634383...| 4| zhuangbei皮甲|1a0d1d0d-123a-414...| 4| dz| 盗贼| daozei| 狂徒|刺杀|敏锐| 皮甲|
| 20240528163634383|20240528163634383...| 5| zhuangbei皮甲|1a0d1d0d-123a-414...| 5| ws| 武僧| wuseng| 酒仙|踏风|织雾| 皮甲|
| 20240528163634383|20240528163634383...| 6| zhuangbei皮甲|1a0d1d0d-123a-414...| 6| xd| 德鲁伊| xiaode|恢复|平衡|野性|守护| 皮甲|
| 20240528163634383|20240528163634383...| 7| zhuangbei皮甲|1a0d1d0d-123a-414...| 7| dh|恶魔猎手| emolieshou| 复仇|浩劫| 皮甲|
| 20240528163634383|20240528163634383...| 11| zhuangbei板甲|1a0d1d0d-123a-414...| 11| dk|死亡骑士|siwangqishi| 鲜血|冰霜|邪恶| 板甲|
| 20240528163634383|20240528163634383...| 12| zhuangbei板甲|1a0d1d0d-123a-414...| 12| zs| 战士| zhanshi| 武器|狂暴|防护| 板甲|
| 20240528163634383|20240528163634383...| 13| zhuangbei板甲|1a0d1d0d-123a-414...| 13| sq| 圣骑士| shengqi| 神圣|防护|惩戒| 板甲|
| 20240528163634383|20240528163634383...| 8| zhuangbei锁甲|1a0d1d0d-123a-414...| 8| lr| 猎人| lieren| 兽王|生存|射击| 锁甲|
| 20240528163634383|20240528163634383...| 9| zhuangbei锁甲|1a0d1d0d-123a-414...| 9| sm| 萨满| saman| 恢复|增强|元素| 锁甲|
| 20240528163634383|20240528163634383...| 10| zhuangbei锁甲|1a0d1d0d-123a-414...| 10|long| 龙人| longren| 湮灭|恩护|增辉| 锁甲|
| 20240528163634383|20240528163634383...| 1| zhuangbei布甲|1a0d1d0d-123a-414...| 1| fs| 法师| fashi| 冰法|火法|奥法| 布甲|
| 20240528163634383|20240528163634383...| 2| zhuangbei布甲|1a0d1d0d-123a-414...| 2| ms| 牧师| mushi| 神牧|戒律|暗牧| 布甲|
| 20240528163634383|20240528163634383...| 3| zhuangbei布甲|1a0d1d0d-123a-414...| 3| ss| 术士| shushi| 毁灭|痛苦|恶魔| 布甲|
---------------------------------------------------------------------------------------------------------------------------------------------------------对比原MySQL表结构多了_hoodie_commit_time、_hoodie_commit_seqno、_hoodie_record_key、_hoodie_partition_path、_hoodie_file_name _hoodie_commit_time: 这个字段记录了最后一次数据提交的时间戳。每次写入操作如插入、更新或删除都会生成一个新的提交这个时间戳就是该提交的记录时间。它对于数据恢复和理解数据的更新历史非常重要。_hoodie_commit_seqno: 这个字段包含了一个序列号用于标识每个提交的顺序。序列号是按提交顺序递增的它有助于确定不同提交之间的先后顺序并且在处理增量数据时特别有用。_hoodie_record_key: 这是记录键Record Key的字段通常是由表的主键或者由用户定义的某个唯一标识符生成的。在 Hudi 中这个字段用于唯一标识一条记录并且在数据去重、更新和删除操作中起到关键作用。_hoodie_partition_path: 这个字段表示记录所属的分区路径。在 Hudi 中数据可以按照分区进行组织每个分区对应文件系统中的一个目录。这个字段记录了记录所在的分区信息有助于在查询时进行分区剪枝提高查询效率。_hoodie_file_name: 这个字段包含了存储该记录的 Hudi 文件的文件名。在 Hudi 的文件组织结构中每个提交都会生成新的数据文件文件名通常包含了提交的时间戳和其他信息有助于追踪数据的来源和版本。 这些字段是 Hudi 表的元数据的一部分它们对于 Hudi 的数据管理、查询优化、增量处理等功能至关重要。在进行数据分析时这些字段可以提供额外的信息帮助用户更好地理解和操作数据。 关联上方的底层存储文件 可以看到文件名的fileId、instantTime其实就是对应着数据存储中的_hoodie_file_name、_hoodie_commit_time