冠县网站建设价格,专门做淘宝代运营的网站,中山精品网站建设方案,大华天途建设集团网站文章目录 问题背景解决方式代码实现Spark写GreenplumSpark读Greenplum 参考 问题背景
通过数据平台上的DataX把Hive表数据同步至Greenplum#xff08;因为DataX原生不支持Greenplum Writer#xff0c;只能采用PostgreSQL驱动的方式#xff09;#xff0c;但是同步速度太慢… 文章目录 问题背景解决方式代码实现Spark写GreenplumSpark读Greenplum 参考 问题背景
通过数据平台上的DataX把Hive表数据同步至Greenplum因为DataX原生不支持Greenplum Writer只能采用PostgreSQL驱动的方式但是同步速度太慢了100Kb/sDataX服务器和Greenplum服务器都在内网实测服务器间传输文件速率可以达到170Mb/s根本没法用。
解决方式
查看Greenplum官网给出了以下几种将外部数据写入Greenplum方式
JDBCJDBC方式写大数据量会很慢。gpload适合写大数据量数据能并行写入。但其缺点是需要安装客户端包括gpfdist等依赖安装起来很麻烦。需要了解可以参考gpload。Greenplum-Spark Connector基于Spark并行处理并行写入Greenplum并提供了并行读取的接口。
而我们之前采用的PostgreSQL驱动的方式就是因为使用了JDBC导致写入速度非常慢。综合官网提供的这3中方式我们最终选择了Greenplum-Spark Connector这种方式但是只提供了Spark2.3版本支持其他版本未验证过。
Greenplum-Spark Connector具体的读写架构和流程请参考Greenplum官网文档https://cn.greenplum.org/greenplum-spark-connector/。
代码实现
Greenplum-Spark Connector需要引入两个依赖包
greenplum-spark_2.11-2.3.0.jarpostgresql-42.2.27.jar
greenplum-spark_2.11-2.3.0.jar无法通过Maven自动下载需要到上面网址手动下载且要先注册网址账号才允许下载。
Spark写Greenplum
代码实现
package com.demoimport org.apache.spark.sql.{SaveMode, SparkSession}import java.time.LocalDateTime
import java.time.format.DateTimeFormatterobject SparkWriteGreenplum {def main(args: Array[String]): Unit {val spark SparkSession.builder().appName(Spark to Greenplum).enableHiveSupport().getOrCreate()spark.sparkContext.setLogLevel(INFO)// main函数传参数获取表名val tableName args(0)val days args(1).toLong/** spark写greenplum *///Greenplum配置信息val gscWriteOptionMap Map(url - jdbc:postgresql://host:5432/db,user - u,password - p,dbschema - schema,dbtable - table)// Hiv表分区val ds LocalDateTime.now().minusDays(days).format(DateTimeFormatter.ofPattern(yyyyMMdd))// 读取Hive表val df spark.sql(select * from db. tableName where ds ds)// Dataframe写Greenplumdf.write.format(greenplum).mode(SaveMode.Overwrite).options(gscWriteOptionMap).save()spark.stop()}
}最终以4个executor、每个executor 1核1G执行Spark任务1400w条数据3分钟左右就导完了效果提升非常明显。
Spark读Greenplum // spark读greenplumval gscReadOptionMap Map(url - jdbc:postgresql://host:5432/db,user - u,password - p,dbschema - sc,dbtable - table)val df: DataFrame spark.read.format(greenplum).options(gscReadOptionMap).load()df.show()参考
https://cn.greenplum.org/greenplum-spark-connector/https://greenplum-spark-connector.readthedocs.io/en/latest/Write-data-from-Spark-into-Greenplum.htmlhttps://network.pivotal.io/products/vmware-greenplum#/releases/1427678/file_groups/17497