dedecms网站安装,网站死链是什么,绍兴易网网站开发,织梦网站添加下载备注#xff1a;
By 远方时光原创#xff0c;可转载#xff0c;不能复制到其他平台 背景#xff1a;做流批一体#xff0c;湖仓一体的大数据架构#xff0c;常见的做法就是
数据源-spark Streaming-ODS#xff08;数据湖#xff09;-spark streaming-
By 远方时光原创可转载不能复制到其他平台 背景做流批一体湖仓一体的大数据架构常见的做法就是
数据源-spark Streaming-ODS数据湖-spark streaming-DWD数据湖-...
那么数据源-spark Streaming-ODS以这段为例在数据源通过spark structured streaming写入ODS在数据湖Delta Lake)落盘时候必然会产生很多小文件 目的
为了在批处理spark-sql运行更快也避免因为小文件而导致报错
影响
WARNING: Failed to connect to /172.16.xx.xx:9866 for block, add to deadNodes and continue. java.net.SocketException: Too many open files
1.小文件在批处理数据IO消耗巨大程序可能卡死
2.小文件块都有对应的元数据元数据放在NameNode导致需要的内存大大增大增加NameNode压力这样会限制了集群的扩展。
3.在HDFS或者对象储存中小文件的读写处理速度要远远小于大文件寻址耗时 解决思路
事前
1.避免写入时候产生过多小文件
做好分区partitionBy(年月日), 避免小文件过于分散Trigger触发时间可以设置为1分钟这样会攒一批一写入避免秒级别写入而产生大量小文件但是使用spark structured 想要做real-time不能这样只适合做准实时
2.打开自适应框架的开关
spark.sql.adaptive.enabled true
3.通过spark的coalesce()方法和repartition()方法
val rdd2 rdd1.coalesce(8, true) //true表示是否shuffle
val rdd3 rdd1.repartition(8)coalescecoalesce()方法的作用是返回指定一个新的指定分区的Rdd如果是生成一个窄依赖的结果那么可以不发生shuffle分区的数量发生激烈的变化计算节点不足不设置true可能会出错。 repartitioncoalesce()方法shuffle为true的情况。
事后小文件引起已经产生
1优化 Delta 表的写入避免小文件产生
在开源版 Spark 中每个 executor 向 partition 中写入数据时都会创建一个表文件进行写入最终会导致一个 partition 中产生很多的小文件。
Databricks 对 Delta 表的写入过程进行了优化对每个 partition使用一个专门的 executor 合并其他 executor 对该 partition 的写入从而避免了小文件的产生。
该特性由表属性 delta.autoOptimize.optimizeWrite 来控制
可以在创建表时指定
CREATE TABLE student (id INT, name STRING)
TBLPROPERTIES (delta.autoOptimize.optimizeWrite true);
也可以修改表属性
ALTER TABLE table_name
SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite true);
该特性有两个优点通过减少被写入的表文件数量提高写数据的吞吐量避免小文件的产生提升查询性能。
其缺点也是显而易见的由于使用了一个 executor 来合并表文件的写入从而降低了表文件写入的并行度此外多引入的一层 executor 需要对写入的数据进行 shuffle带来额外的开销。因此在使用该特性时需要对场景进行评估
该特性适用的场景频繁使用 MERGEUPDATEDELETEINSERT INTOCREATE TABLE AS SELECT 等 SQL 语句的场景
该特性不适用的场景写入 TB 级以上数据。
2.自动合并小文件
在流处理场景中比如流式数据入湖场景下需要持续的将到达的数据插入到 Delta 表中每次插入都会创建一个新的表文件用于存储新到达的数据假设每10s触发一次那么这样的流处理作业一天产生的表文件数量将达到8640个且由于流处理作业通常是 long-running 的运行该流处理作业100天将产生上百万个表文件。这样的 Delta 表仅元数据的维护就是一个很大的挑战查询性能更是急剧恶化。
为了解决上述问题Databricks 提供了小文件自动合并功能在每次向 Delta 表中写入数据之后会检查 Delta 表中的表文件数量如果 Delta 表中的小文件size 128MB 的视为小文件数量达到阈值则会执行一次小文件合并将 Delta 表中的小文件合并为一个新的大文件。
该特性由表属性 delta.autoOptimize.autoCompact 控制和特性 delta.autoOptimize.optimizeWrite 相同可以在创建表时指定也可以对已创建的表进行修改。自动合并的阈值由 spark.databricks.delta.autoCompact.minNumFiles 控制默认为50即小文件数量达到50会执行表文件合并合并后产生的文件最大为128MB如果需要调整合并后的目标文件大小可以通过调整配置 spark.databricks.delta.autoCompact.maxFileSize 实现。
3手动合并小文件我常用每天定时运行合并分区内小文件再去处理批任务
自动小文件合并会在对 Delta 表进行写入且写入后表中小文件达到阈值时被触发。除了自动合并之外Databricks 还提供了 Optimize 命令使用户可以手动合并小文件优化表结构使得表文件的结构更加紧凑。在实现上 Optimize 使用 bin-packing 算法该算法不但会合并表中的小文件且合并后生成的表文件也更均衡表文件大小相近。例如我们要对 Delta 表 student 的表文件进行优化仅需执行如下命令即可实现Optimize 命令不但支持全表小文件的合并还支持特定的分区的表文件的合并
OPTIMIZE student WHERE date 2024-01-01 附加
面试官可能会问我运行optimize合并小文件但是小文件太多了直接卡死运行不了程序某互联网面试题
回答
1.首先停掉程序这里注意deltalake因为有历史版本这个概念所以不存在运行一半覆盖原来版本情况可以基于上一个版本重新运行考点
2.第二点大数据思想分而治之“分”即把复杂的任务分解为若干个“简单的任务”来处理。
OPTIMIZE student WHERE date 2024-01-01 and date 2024-01-02
因为前面做了partitionby(年月日)那么缩小optimize范围在遍历这个月的每一天日期分治处理
3.第三点大数据思想自己不行找兄弟加节点加计算资源