加拿大购物网站排名,啥十小企业网站建设,合肥企业宣传片制作公司,网站流量 名词背景
在大环境不好的情况下,本司也开始了“降本增效”#xff0c;本文探讨一下#xff0c;在这种背景下 Spark怎么做的降本增效。 Yarn 基于 EMR CPU 是xlarge#xff0c;也就是内存和核的比例在7:1左右的 #xff0c;磁盘是基于 NVMe SSD Spark 3.5.0(也是刚由3.1 升级而…背景
在大环境不好的情况下,本司也开始了“降本增效”本文探讨一下在这种背景下 Spark怎么做的降本增效。 Yarn 基于 EMR CPU 是xlarge也就是内存和核的比例在7:1左右的 磁盘是基于 NVMe SSD Spark 3.5.0(也是刚由3.1 升级而来 ) JDK 8 这里为什么强调 NVMe 因为相比于 HDD 来说他的磁盘IO有更高的读写速度。 导致我们在 Spark上 做的一些常规优化是不起效果的
注意如没特别说明 P99 P95 avg等时间单位是秒
优化手段
调整JVM GC策略
因为我们内部存在于类似 Apache kyuubi这种 long running的服务而且内存都是20GB起步所以第一步就想到调整 CMS 策略为 G1 或者 Parallel. 测试结果
GC类型p100p99p95p90p50avgtask_countCMS95692085.3800000000047688208343.8580512068836887363G193802164.9100000000035853378652.2275451140619788110Parallel109192192.970000000001888393652.0600647889858788904
从结果上来看CMS在 吞吐上比 G1和Parallel 都要好的,附上 JVM的一些关键参数如下 Spark driver端的配置都是一样
spark.driver.extraJavaOptions: -verbose:gc -XX:UseParNewGC -XX:MaxTenuringThreshold15 -XX:UseCMSCompactAtFullCollection -XX:CMSFullGCsBeforeCompaction5 -XX:UseConcMarkSweepGC -XX:CMSParallelRemarkEnabled -XX:UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction70 -XX:ParallelRefProcEnabled -XX:ExplicitGCInvokesConcurrent -XX:PrintGCDetails -XX:PrintGCTimeStamps -XX:HeapDumpOnOutOfMemoryError -XX:HeapDumpPathheapdump,CMS的 Executor 配置如下:
spark.executor.extraJavaOptions: -verbose:gc -XX:UseParNewGC -XX:MaxTenuringThreshold15 -XX:UseCMSCompactAtFullCollection -XX:CMSFullGCsBeforeCompaction5 -XX:UseConcMarkSweepGC -XX:CMSParallelRemarkEnabled -XX:UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction70 -XX:ParallelRefProcEnabled -XX:ExplicitGCInvokesConcurrent -XX:PrintGCDetails -XX:PrintGCTimeStamps -XX:HeapDumpOnOutOfMemoryError -XX:HeapDumpPathheapdumpG1的 Executor 配置如下:
spark.executor.extraJavaOptions: -verbose:gc -XX:UseG1GC -XX:ParallelGCThreads10 -XX:ConcGCThreads5 -XX:G1ReservePercent15 -XX:MaxTenuringThreshold15 -XX:ParallelRefProcEnabled -XX:ExplicitGCInvokesConcurrent -XX:PrintGCDetails -XX:PrintGCTimeStamps -XX:HeapDumpOnOutOfMemoryError -XX:HeapDumpPathheapdumpParellel 的 Executor 配置如下:
spark.executor.extraJavaOptions: -Dcarbon.properties.filepathcarbon.properties -Dlog4j.configurationxql-log4j.properties -verbose:gc -XX:UseParallelGC -XX:UseParallelOldGC -XX:ParallelGCThreads10 -XX:UseAdaptiveSizePolicy -XX:ParallelRefProcEnabled -XX:PrintGCDetails -XX:PrintGCTimeStamps -XX:HeapDumpOnOutOfMemoryError -XX:HeapDumpPathheapdump增发并发度
在对任务的运行时长进行统计的时候我们发现绝大部分任务都是运行时间在5分钟左右而与此同时在任务高峰期的时候整体任务的等待数量一直在1800左右这种情况下增大任务的并发度能够很好的增加整体的吞吐实践中起码后20%的提速具体的运行时间如下
p99p95p90p50avg239194656238193.19207605335595
引入ESS或者RSS
对于 ESS 可以参考:push-based shuffle to improve shuffle efficiency,但是这里请注意一点该问题的提出点是基于 HDD 类型的磁盘的因为我们现在是基于 NVMe SSD 的所以几乎没有什么提升。 对于 RSS 可以参考:Apache celeborn,结果也是差不多。
引入向量化插件
对于向量化加速这块建议参考 Gluten这也是笔者一直在关注的项目根据 TPC-H 测试结果显示起码有2倍的性能提升但是实际效果还是得看SQL的pattern。但是由于目前我们的Spark 是基于 3.5.0的是比较新的版本而社区这块的融合还在继续所以这块今年应该可以行动起来,可以参考Add spark35 scala213
设置压缩格式为ZSTD
众所周知ZSTD在压缩率和压缩解压缩上的性能都是很突出但是在 NVMe SSD 光环下改成了ZSTD的压缩格式以后效果却不如人意,而且我们还合并了Parallel Compression Support for ZSTD到我们内部的Spark版本中具体的效果如下: 具体的配置项为:“spark.io.compression.codec”:zstd
压缩格式p99p95p90p50avgtask_countlz42806.510000000003412160585.62898089171975628zstd1814.3700000000076118.349999999999878.3000000000000111117.2063492063492378
注意 我们批集群的CPU利用率在60%以上引入zstd以后会增加CPU的使用率而且在这种 long running的服务下得增加driver的core数要不然会导致在进行广播的时候卡住,具体的可以加如下配置:
spark.driver.cores:8,
spark.io.compression.zstd.workers:2,至于效果为什么不理想主要原因还是因为 shuffle数据量不大可能对于单个shuffle数据量大的应用来说可能效果比较明显。
增大可广播join的阈值大小
因为对于这种 long running的服务driver端的内存一般都是挺大的所以可以适当增大spark.sql.autoBroadcastJoinThreshold的配置笔者这里设置了 50MB
引入动态资源分配
对于这种long running服务来说每个时段的任务数肯定是不一样的那这个时候引入动态资源分配的话效果就是很好的具体的配置可以参考如下:
spark.shuffle.service.enabled:false
spark.dynamicAllocation.enabled: true,
spark.dynamicAllocation.shuffleTracking.enabled: true,
spark.dynamicAllocation.minExecutors: 20,
spark.dynamicAllocation.maxExecutors: 40,
spark.dynamicAllocation.executorIdleTimeout:3600