上海巨型网站建设,南京做网站联系南京乐识,做招投标有哪些网站,wordpress邮件回复《大数据平台架构与原型实现#xff1a;数据中台建设实战》一书由博主历时三年精心创作#xff0c;现已通过知名IT图书品牌电子工业出版社博文视点出版发行#xff0c;点击《重磅推荐#xff1a;建大数据平台太难了#xff01;给我发个工程原型吧#xff01;》了解图书详…《大数据平台架构与原型实现数据中台建设实战》一书由博主历时三年精心创作现已通过知名IT图书品牌电子工业出版社博文视点出版发行点击《重磅推荐建大数据平台太难了给我发个工程原型吧》了解图书详情京东购书链接https://item.jd.com/12677623.html扫描左侧二维码进入京东手机购书页面。
自Amazon EMR推出Serverlesss形态以来得益于开箱即用和零运维的优质特性越来越多的EMR用户开始尝试EMR Serverless。在使用过程中一个常被提及的问题是我们应该如何在EMR Serverless上提交Spark/Hive作业本文我们将分享一些这方面的最佳实践帮助大家以一种更优雅的方式使用这项服务。
一份通俗易懂的讲解最好配一个形象生动的例子本文选择《CDC一键入湖当 Apache Hudi DeltaStreamer 遇见 Serverless Spark》一文介绍的DeltaStreamer作业作为讲解示例因为这个作业既有一定的通用性又足够复杂可以涵盖大多数EMR Serverless作业遇到的场景更重要的是该作业的提交方式遵循了本文要介绍的各项最佳实践本文是其姊妹篇。不了解Apache Hudi的读者不必担心本文的关注点在于如何提交EMR Serverless作业本身而非DeltaStreamer的技术细节所以不会影响到您阅读此文。
参考范本
首先我们整理一下提交DeltaStreamer CDC作业的几项关键操作下文会以这些脚本为例介绍蕴含其中的各项最佳实践。
1. 导出环境相关变量
export APP_NAMEapache-hudi-delta-streamer
export APP_S3_HOMEs3://apache-hudi-delta-streamer
export APP_LOCAL_HOME/home/ec2-user/apache-hudi-delta-streamer
export EMR_SERVERLESS_APP_ID00fbfel40ee59k09
export EMR_SERVERLESS_EXECUTION_ROLE_ARNarn:aws:iam::123456789000:role/EMR_SERVERLESS_ADMIN2. 创建作业专属工作目录和S3存储桶
mkdir -p $APP_LOCAL_HOME
aws s3 mb $APP_S3_HOME3. 准备作业描述文件
cat EOF $APP_LOCAL_HOME/start-job-run.json
{name:apache-hudi-delta-streamer,applicationId:$EMR_SERVERLESS_APP_ID,executionRoleArn:$EMR_SERVERLESS_EXECUTION_ROLE_ARN,jobDriver:{sparkSubmit:{entryPoint:/usr/lib/hudi/hudi-utilities-bundle.jar,entryPointArguments:[--continuous,--enable-sync,--table-type, COPY_ON_WRITE,--op, UPSERT,--target-base-path, $APP_S3_HOME/data/mysql-server-3/inventory/orders,--target-table, orders,--min-sync-interval-seconds, 60,--source-class, org.apache.hudi.utilities.sources.debezium.MysqlDebeziumSource,--source-ordering-field, _event_origin_ts_ms,--payload-class, org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload,--hoodie-conf, bootstrap.servers$KAFKA_BOOTSTRAP_SERVERS,--hoodie-conf, schema.registry.url$SCHEMA_REGISTRY_URL,--hoodie-conf, hoodie.deltastreamer.schemaprovider.registry.url${SCHEMA_REGISTRY_URL}/subjects/osci.mysql-server-3.inventory.orders-value/versions/latest,--hoodie-conf, hoodie.deltastreamer.source.kafka.value.deserializer.classio.confluent.kafka.serializers.KafkaAvroDeserializer,--hoodie-conf, hoodie.deltastreamer.source.kafka.topicosci.mysql-server-3.inventory.orders,--hoodie-conf, auto.offset.resetearliest,--hoodie-conf, hoodie.datasource.write.recordkey.fieldorder_number,--hoodie-conf, hoodie.datasource.write.partitionpath.fieldorder_date,--hoodie-conf, hoodie.datasource.hive_sync.partition_extractor_classorg.apache.hudi.hive.MultiPartKeysValueExtractor,--hoodie-conf, hoodie.datasource.write.hive_style_partitioningtrue,--hoodie-conf, hoodie.datasource.hive_sync.databaseinventory,--hoodie-conf, hoodie.datasource.hive_sync.tableorders,--hoodie-conf, hoodie.datasource.hive_sync.partition_fieldsorder_date],sparkSubmitParameters:--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.serializerorg.apache.spark.serializer.KryoSerializer --conf spark.hadoop.hive.metastore.client.factory.classcom.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.jars$(aws s3 ls $APP_S3_HOME/jars/ | grep -o \S*\.jar$| awk {print $APP_S3_HOME/jars/$1,} | tr -d \n | sed s/,$//)}},configurationOverrides:{monitoringConfiguration:{s3MonitoringConfiguration:{logUri:$APP_S3_HOME/logs}}}
}
EOF
jq . $APP_LOCAL_HOME/start-job-run.json4. 提交作业
export EMR_SERVERLESS_JOB_RUN_ID$(aws emr-serverless start-job-run \--no-paginate --no-cli-pager --output text \--name apache-hudi-delta-streamer \--application-id $EMR_SERVERLESS_APP_ID \--execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \--execution-timeout-minutes 0 \--cli-input-json file://$APP_LOCAL_HOME/start-job-run.json \--query jobRunId)5. 监控作业
now$(date %s)sec
while true; dojobStatus$(aws emr-serverless get-job-run \--no-paginate --no-cli-pager --output text \--application-id $EMR_SERVERLESS_APP_ID \--job-run-id $EMR_SERVERLESS_JOB_RUN_ID \--query jobRun.state)if [ $jobStatus PENDING ] || [ $jobStatus SCHEDULED ] || [ $jobStatus RUNNING ]; thenfor i in {0..5}; doecho -ne \E[33;5m The job [ $EMR_SERVERLESS_JOB_RUN_ID ] state is [ $jobStatus ], duration [ $(date -u --date now-$now %H:%M:%S) ] ....\r\E[0msleep 1doneelseecho -ne The job [ $EMR_SERVERLESS_JOB_RUN_ID ] is [ $jobStatus ]\n\nbreakfi
done6. 检查错误
JOB_LOG_HOME$APP_LOCAL_HOME/log/$EMR_SERVERLESS_JOB_RUN_ID
rm -rf $JOB_LOG_HOME mkdir -p $JOB_LOG_HOME
aws s3 cp --recursive $APP_S3_HOME/logs/applications/$EMR_SERVERLESS_APP_ID/jobs/$EMR_SERVERLESS_JOB_RUN_ID/ $JOB_LOG_HOME /dev/null
gzip -d -r -f $JOB_LOG_HOME /dev/null
grep --coloralways -r -i -E error|failed|exception $JOB_LOG_HOME最佳实践 (1)提取环境相关信息集中配置提升脚本可移植性
※ 此项最佳实践参考《参考范本1. 导出环境相关变量》
在EMR Serverless的作业脚本中经常会出现与AWS账号和本地环境有关的信息例如资源的ARN各种路径等当我们要在不同环境如开发、测试或生产中提交作业时就需要查找和替换这些环境相关的信息。为了让脚本具备良好的可移植性推荐的做法是将这些信息抽离出来以全局变量的形式集中配置这样当在一个新环境新的AWS账号或服务器中提交作业时只需修改这些变量即可而不是具体的脚本。
最佳实践 (2)为作业创建专用的工作目录和S3存储桶
※ 此项最佳实践参考《参考范本2. 创建作业专属工作目录和S3存储桶》
为一个作业或应用程序创建专用的工作目录和S3存储桶是一个良好的规范和习惯。一方面将本作业/应用的所有“资源”包括脚本、配置文件、依赖包、日志以及产生的数据统一存放在有利于集中管理和维护如果要在Linux和S3上给作业赋予读写权限操作起来了也会简单一些。
最佳实践 (3)使用作业描述文件规避字符转义问题
※ 此项最佳实践参考《参考范本3. 准备作业描述文件》
我们通常见到的EMR Serverless作业提交示例是将作业描述以字符串参数形式传递给命令行的就像下面这样
aws emr-serverless start-job-run \--application-id $EMR_SERVERLESS_APP_ID \--execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \--job-driver {sparkSubmit: {entryPoint: s3://us-east-1.elasticmapreduce/emr-containers/samples/wordcount/scripts/wordcount.py,entryPointArguments: [s3://my-job-bucket/output]}}这种方式只能应对简单的作业提交当作业中包含大量参数和变量时很容易出现单引号、双引号、美元符等特殊字符的转义问题由于这里牵涉shell字符串和json字符串的双重嵌套和解析所以会非常麻烦。此时在命令行中给出作业描述是很不明智的更好的做法是使用cat命令联合heredoc来创建作业描述文件然后在命令行中以--cli-input-json file://xxx.json形式将作业描述传递给命令行
# 生成作业描述文件
cat EOF xxx.json... ...... ...... ...
EOF
# 使用作业描述文件提交作业
aws emr-serverless start-job-run ... --cli-input-json file://xxx.json ...这是一个非常重要的技巧使用这种形式提交作业有如下两个好处 在cat heredoc中编辑的文本为原生字符串不用考虑字符转义问题 在cat heredoc中可嵌入shell变量、函数调用和if…else等结构体实现“动态”构建作业描述文件
最佳实践 (4)在作业描述文件中嵌入shell变量和脚本片段实现“动态”构建
※ 此项最佳实践参考《参考范本3. 准备作业描述文件》
如上所述采用cat heredoc编辑作业描述文件后可以在编辑文件的过程中嵌入shell变量、函数调用和if...else...等复合结构体使得我们可以动态构建作业描述文件这是非常重要的一个能力。在《参考范本3. 准备作业描述文件》中有一个很好的例证就是“动态拼接依赖Jar包的路径”
--conf spark.jars$(aws s3 ls $APP_S3_HOME/jars/ | grep -o \S*\.jar$| awk {print $APP_S3_HOME/jars/$1,} | tr -d \n | sed s/,$//)这是在构建作业描述文件start-job-run.json的过程中通过$(....)嵌入的一段shell脚本这段脚本遍历了指定目录下的jar文件并拼接成一个字符串输出出来而输出的字符串会在嵌入脚本的地方变成文本的一部分我们还可以在编辑文本时调用shell函数嵌入if...else...whilecase等多重复合逻辑结构让作业描述文件可以根据不同的参数和条件动态生成期望的内容这种灵活性足以让开发者应对任何复杂的情况。
最佳实践 (5)使用jq校验并格式化作业描述文件
※ 此项最佳实践参考《参考范本3. 准备作业描述文件》
jq是一个处理json文件的命令行工具对于AWS CLI来说jq可以说是一个“最佳伴侣”。原因是使用AWS CLI创建资源时除了传入常规参数之外还可以通过--cli-input-json参数传入一个json文件来描述所要创建的资源。当创建的资源配置过于复杂时json文件的优势就会凸显出来就像我们参考范本中的这个EMR Serverless Job一样。所以使用AWS CLI时经常有编辑和操作json文件的需求此时jq就成为了一个强有力的辅助工具。在参考范本中我们仅仅使用jq打印了一下生成的作业描述文件
jq . $APP_LOCAL_HOME/start-job-run.json这一步操作有两个作用一是利用jq校验了json文件这能帮助排查文件中的json格式错误二是jq输出的json经过了格式化和语法着色更加易读。
其实jq在AWS CLI上还有更多高级应用只是在我们的参考范本中并没有体现出来。在某些情况下我们可以通过jq直接检索和编辑作业描述文件将jq和使用cat heredoc的json编辑方式结合起来可以创建更加复杂和动态化的作业描述文件。
最佳实践 (6)可复用的依赖Jar包路径拼接脚本
※ 此项最佳实践参考《参考范本3. 准备作业描述文件》
拼接依赖Jar包路径几乎是每个作业都要解决的问题手动拼接虽然可行但费力且容易出错。过去在本地环境中我们可以使用--jars $(echo /path/*.jar | tr ,)这种简洁而优雅的方式拼接Jar包路径。但是EMR Serverless作业的依赖Jar包是存放在S3上的这此我们针对性地编写了一段可复用的脚本来拼接位于S3指定目录下的Jar包路径供大家参考请注意替换脚本中出现的两处文件夹路径
aws s3 ls $APP_S3_HOME/jars/ | grep -o \S*\.jar$| awk {print $APP_S3_HOME/jars/$1,} | tr -d \n | sed s/,$//最佳实践 (7)可复用的作业监控脚本
※ 此项最佳实践参考《参考范本5. 监控作业》
使用命令行提交EMR Serverless作业后用户可以转到AWS控制台上查看作业的状态但是对开发者来说这种切换会分散注意力最完美的方式莫过于提交作业后继续在命令行窗口监控作业状态直到其失败或成功运行。为此《参考范本5. 监控作业》给出了一种实现可复用于所有EMR Serverless作业供大家参考。
最佳实践 (8)可复用的日志错误信息检索脚本
※ 此项最佳实践参考《参考范本6. 检查错误》
在日常开发中“提交作业报错 - 查看日志中的报错信息 - 修改代码重新提交”是一个反复迭代的过程在EMR Serverless中用户需要切换到AWS控制台查看错误日志并且有时日志量会非常大在控制台上查看效率很低。一种更高效的做法是将存放于S3上的日志文件统一下载到本地并解压然后使用grep命令快速检索日志中含有error, failed, exception等关键字的行然后再打开具体文件仔细查看。将这些动作脚本化后我们就能得到一段可复用的日志错误信息检索脚本对于调试和排查错误有很大的帮助。为此《参考范本6. 检查错误》给出了一种实现可复用于所有EMR Serverless作业供大家参考。