新余 网站建设,国际网站平台,长春到四平,wordpress前台可发表文章Flink常见的部署模式 Flink部署、执行模式Flink的部署模式Flink的执行模式 Local本地模式下载安装启动、停止Flink提交测试任务停止作业 Standalone独立模式会话模式单作业模式应用模式 YARN运行模式会话模式启动Hadoop集群申请一个YARN会话查看Yarn、Flink提交作业查看、测试作… Flink常见的部署模式 Flink部署、执行模式Flink的部署模式Flink的执行模式 Local本地模式下载安装启动、停止Flink提交测试任务停止作业 Standalone独立模式会话模式单作业模式应用模式 YARN运行模式会话模式启动Hadoop集群申请一个YARN会话查看Yarn、Flink提交作业查看、测试作业 单作业模式提交作业查看Yarn、Flink查看、取消作业 应用模式提交作业查看、取消作业从HDFS读取提交任务 Yarn模式高可用 Flink部署、执行模式
Flink的部署模式 本地模式、Standalone模式和FlinkonYARN模式是Flink的三种常见部署模式。 1.Local本地模式 在本地模式下Flink以单机模式运行无需启动分布式资源管理器。这种模式适用于本地开发和测试用于验证Flink代码的正确性和性能。 2.Standalone模式 在Standalone模式下Flink作为一个独立的集群运行。需要启动Flink的JobManager和TaskManagerJobManager负责接收和调度任务而TaskManager负责执行任务。 3.Flink on YARN模式 在FlinkonYARN模式下Flink在YARNHadoop的资源调度和集群管理系统之上运行。Flink作为一个YARN应用程序利用YARN来管理资源分配和任务调度。使用这种模式可以充分利用Hadoop集群的资源实现Flink的分布式计算。 Flink的执行模式 Flink可以通过以下三种方式之一执行应用程序 1.Session Mode会话模式 会话模式需要先启动一个集群保持一个会话在这个会话中通过客户端提交作业。集群启动时所有资源就都已经确定所有提交的作业会竞争集群中的资源。适合任务规模小执行时间短的大量作业。 Flink的作业执行环境会一直保留在集群上直到会话被显式终止。这样可以提交多个作业它们可以共享相同的集群资源和状态从而实现更高的效率和资源利用。 2.Per-Job Mode单作业模式 每个Flink应用程序作为一个独立的作业被提交和执行。 每次提交的Flink应用程序都会创建一个独立的作业执行环境该作业执行环境仅用于执行该特定的作业。 当作业完成后作业执行环境会被释放集群关闭资源释放 3.Application Mode应用模式 应用模式算是前2种模式的升级前2种模式中Flink程序代码是在客户端执行然后客户端提交给JobManager客户端需要占用大量网络带宽。 应用模式需要为每一个提交的应用单独启动一个JobManager应用程序在JobManager执行也就是创建一个集群。这个JobManager只为执行这一个应用而存在执行结束之后JobManager关闭。 4.三种模式的区别
集群生命周期和资源隔离保证应用程序的main()方法是在客户端还是在集群上执行Local本地模式 Local模式是Flink提供的最简单部署模式可以在单台服务器上运行适用于日常的开发和调试。 注意Flink的运行依赖JAVA环境需要预先安装好JDK 下载安装
Flink下载地址 https://archive.apache.org/dist/flink/
下载Flink
wget https://repo.huaweicloud.com/apache/flink/flink-1.17.0/flink-1.17.0-bin-scala_2.12.tgz解压、重命名
tar -zxvf flink-1.17.0-bin-scala_2.12.tgz mv flink-1.17.0 flink启动、停止Flink
不需要进行任何配置直接使用Flink默认配置直接运行脚本启动
bin/start-cluster.sh停止Flink
bin/stop-cluster.sh直接访问http://IP:8081可以看到Flink的后台管理界面 每个taskmanager有3个solt 提交测试任务
提交一个测试任务
./bin/flink run examples/batch/WordCount.jar在控制台直接看到输出
[rootnode01 flink]# ./bin/flink run examples/batch/WordCount.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/program/flink/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/program/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID a946d0abf84ac6848a823cec43f7056f
Program execution finished
Job with JobID a946d0abf84ac6848a823cec43f7056f has finished.
Job Runtime: 584 ms
Accumulator Results:
- 1a50b4c9582d4d35a854872c62391768 (java.util.ArrayList) [170 elements](a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
(arrows,1)
(awry,1)同样在Flink的后台管理界面 Completed Jobs 一栏可以看到刚才提交执行的程序
停止作业 可以直接在 WEB 界面上点击对应作业的 Cancel Job 按钮进行取消也可以使用命令行进行取消。 使用命令行进行取消时需要先获取到作业的JobId
bin/flink list获取到JobId后使用flink cancel JobId命令取消作业
bin/flink cancel a946d0abf84ac6848a823cec43f7056fStandalone独立模式 Standalone模式是集群模式的一种独立模式是独立运行的不依赖任何外部的资源管理平台存在资源不足出现故障不会自动扩展或重分配资源的能力一般用在开发测试或作业非常少的场景下。 优缺点
部署相对简单可以支持小规模少量的任务运行缺少系统层面对集群中Job的管理容易遭成资源分配不均匀资源隔离相对简单任务之间资源竞争严重会话模式 会话模式部署需要先启动集群集群资源固定通过Web页面客户端提交任务可以多个任务。 搭建一个Flink集群参考搭建Flink集群、集群HA高可用以及配置历史服务器
1.启动 Flink 集群 通过bin/start-cluster.sh脚本启动集群 2.打开Flink Web UI 在浏览器中输入http://node01:8081/地址打开Flink Web UI 3.提交Flink作业 在Flink Web UI中选择要提交的 Flink 作业 jar 包并指定作业参数和作业名称。 bin/flink run ../examples/streaming/WordCount.jar4.查看Flink作业 提交作业之后在 Flink Web UI 上会看到作业的运行状态可以查看作业日志和监控指标等信息。 5.停止Flink作业 可以在Flink Web UI中停止作业也可以使用bin/flink cancel jobID命令停止指定的作业 单作业模式 Standalone集群并不支持单作业模式部署单作业模式需要借助一些资源管理平台。 应用模式 应用模式下不会提前创建集群因此不能调用start-cluster.sh脚本但是可以使用在bin目录下的standalone-job.sh来创建一个JobManager。 1.将Flink应用程序的jar包放到Flink的安装路径下的lib目录下。
[rootnode01 flink]# mv /root/demo-1.0-SNAPSHOT.jar lib2.启动netcat
[rootnode01 ~]# nc -lk 88883.启动JobManager 直接指定作业入口类脚本会到lib目录扫描所有的jar包 [rootnode01 flink]# bin/standalone-job.sh start --job-classname cn.ybzy.demo.WordCountDemo
Starting standalonejob daemon on host node01.4.启动TaskManager
[rootnode01 flink]# bin/taskmanager.sh start
Starting taskexecutor daemon on host node01.5.查看进程
[rootnode01 flink]# jps
11973 Jps
11240 TaskManagerRunner
11898 StandaloneApplicationClusterEntryPoint6.查看Web UI 一直是如下所示状态明显异常 查看flink/log/flink-root-standalonejob-1-node01.log日志
1.异常提示资源不够
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_371]at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_371]at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607) ~[?:1.8.0_371]at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) ~[?:1.8.0_371]at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_371]at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_371]修改配置文件调大资源发现无效。
# jobmanager.memory.process.size: 1600m
jobmanager.memory.process.size: 2000m#taskmanager.memory.process.size: 1728m
taskmanager.memory.process.size: 2600m后来仔细观察日志发现一处核心异常如下异常 org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job 6f4f54c45d7bb59531f537b966776793: [ResourceRequirement{resourceProfileResourceProfile{UNKNOWN}, numberOfRequiredSlots3}]关键词numberOfRequiredSlots3尤为重要JobManager启动默认只有1SlotSlot请求资源不够 编辑conf/flink-conf.yaml文件 # taskmanager.numberOfTaskSlots: 1
# 修改Slot数量为3
taskmanager.numberOfTaskSlots: 3停止taskmanager、standalone-job重新启动Web UI显示明显正常 发送测试数据
[rootnode01 ~]# nc -lk 8888
abc bcd cdf7.停止集群
[rootnode01 flink]# bin/taskmanager.sh stop
Stopping taskexecutor daemon (pid: 14117) on host node01.
[rootnode01 flink]# bin/standalone-job.sh stop
No standalonejob daemon (pid: 14813) is running anymore on node01.8.总结 在Flink中Slot是Flink作业管理的资源基本单位一个任务不一定会占用1个Slot。 当向Flink提交一个任务时Flink会为该任务分配所需的Slot数量。通常取决于以下几个因素
任务的并行度Parallelism如果任务的并行度很高即需要同时执行多个子任务则可能需要使用多个Slot。TaskManager的资源如果TaskManager的资源非常丰富例如拥有多个CPU或GPU核心则可以分配更多的Slot来运行任务。反之则可能只能分配较少的Slot。任务的资源需求如果任务需要大量的内存或计算资源则可能需要分配更多的Slot来满足需求。个人在编写Flink程序时设置了并行度打包上传运行由于JobManager的默认numberOfTaskSlots配置为1Solt数量不够故出现上述异常。
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);YARN运行模式 客户端把Flink应用提交给Yarn的ResourceManagerYarn的ResourceManager会向Yarn的NodeManager申请容器。在这些容器上Flink会部署JobManager和TaskManager的实例从而启动集群。Flink会根据运行在JobManger上的作业所需要的Slot数量动态分配TaskManager资源。 1.安装Hadoop 安装Hadoop参考搭建Hadoop3.X完全分布式集群环境 2.配置环境变量
# Hadoop
export HADOOP_HOME/usr/local/program/hadoop
export PATH$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH# Flink
export HADOOP_CONF_DIR${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATHhadoop classpath3.启动Hadoop集群包括HDFS和YARN
[rootnode01 hadoop]# sbin/start-all.sh 4.启动netcat
nc -lk 8888会话模式 YARN的会话模式需要首先申请一个YARN会话YARN Session来启动Flink集群。 启动Hadoop集群
启动Hadoop集群包括HDFS和YARN
[rootnode01 hadoop]# sbin/start-all.sh 申请一个YARN会话
查看yarn-session.sh命令帮助
[rootnode01 flink]# bin/yarn-session.sh --help
Usage:Optional-at,--applicationType arg Set a custom application type for the application on YARN-D propertyvalue use value for given property-d,--detached If present, runs the job in detached mode-h,--help Help for the Yarn session CLI.-id,--applicationId arg Attach to running YARN session-j,--jar arg Path to Flink jar file-jm,--jobManagerMemory arg Memory for JobManager Container with optional unit (default: MB)-m,--jobmanager arg Set to yarn-cluster to use YARN execution mode.-nl,--nodeLabel arg Specify YARN node label for the YARN application-nm,--name arg Set a custom name for the application on YARN-q,--query Display available YARN resources (memory, cores)-qu,--queue arg Specify YARN queue.-s,--slots arg Number of slots per TaskManager-t,--ship arg Ship files in the specified directory (t for transfer)-tm,--taskManagerMemory arg Memory per TaskManager Container with optional unit (default: MB)-yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)-z,--zookeeperNamespace arg Namespace to create the Zookeeper sub-paths for high availability mode主要参数
-d分离模式让Flink YARN客户端后台运行即YARN session可以后台运行-jm--jobManagerMemory配置JobManager所需内存默认单位MB-nm--name配置在YARN UI界面上显示的任务名-qu--queue指定YARN队列名-tm--taskManager配置每个TaskManager所使用内存执行脚本命令向YARN集群申请资源开启一个YARN会话启动Flink集群 [rootnode01 flink]# bin/yarn-session.sh -nm flink-test
......
2023-06-12 22:03:01,088 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted false, remoteHostTrusted false
2023-06-12 22:03:01,428 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted false, remoteHostTrusted false
2023-06-12 22:03:01,457 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted false, remoteHostTrusted false
2023-06-12 22:03:01,476 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Cannot use kerberos delegation token manager, no valid kerberos credentials provided.
2023-06-12 22:03:01,480 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Submitting application master application_1686577483648_0001
2023-06-12 22:03:01,613 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl [] - Submitted application application_1686577483648_0001
2023-06-12 22:03:01,613 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Waiting for the cluster to be allocated
2023-06-12 22:03:01,615 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Deploying cluster, current state ACCEPTED
2023-06-12 22:03:06,406 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - YARN application has been deployed successfully.
2023-06-12 22:03:06,407 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node03:37824 of application application_1686577483648_0001.
JobManager Web Interface: http://node03:37824查看Yarn、Flink
访问http://node01:8088/cluster查看yarn YARN Session启动之后会给出一个Web UI地址以及一个YARN application ID
2023-06-12 22:03:06,406 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - YARN application has been deployed successfully.
2023-06-12 22:03:06,407 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node03:37824 of application application_1686577483648_0001.
JobManager Web Interface: http://node03:37824访问给出的地址http://node03:37824
提交作业 可以通过Web UI或者命令行两种方式提交作业 a.通过Web UI提交作业
b.通过命令行提交作业
1.将Flink程序打Jar包并上传至集群2.执行命令将任务提交到已经开启的Yarn-Session中运行客户端可以自行确定JobManager的地址也可以通过-m或者-jobmanager参数指定JobManager的地址。同时JobManager的地址在YARN Session的启动页面中可以找到。 [rootnode01 ~]# /usr/local/program/flink/bin/flink run -c cn.ybzy.demo.WordCountDemo /root/demo-1.0-SNAPSHOT.jar2023-06-12 22:21:08,468 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2023-06-12 22:21:08,468 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2023-06-12 22:21:08,824 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory (/usr/local/program/flink/conf) already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2023-06-12 22:21:08,860 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at node01/192.168.1.100:8032
2023-06-12 22:21:08,986 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-06-12 22:21:09,049 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node03:37824 of application application_1686577483648_0001.
Job has been submitted with JobID cdf1ff7b48472b3d7bc413a1ee9700e8查看、测试作业 通过Flink的Web UI页面查看提交任务的运行情况Flink会根据运行在JobManger上的作业所需要的Slot数量动态分配TaskManager资源。 发送数据测试
[rootnode01 program]# nc -lk 8888
abc bcd cdf单作业模式 在YARN环境中由于有了外部平台做资源调度因此也可以直接向YARN提交一个单独的作业从而启动一个Flink集群。 提交作业
执行命令提交作业
[rootnode01 flink]# bin/flink run -t yarn-per-job -c cn.ybzy.demo.WordCountDemo /root/demo-1.0-SNAPSHOT.jar
.....
2023-06-12 22:46:26,984 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted false, remoteHostTrusted false
2023-06-12 22:46:27,009 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted false, remoteHostTrusted false
2023-06-12 22:46:27,029 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Cannot use kerberos delegation token manager, no valid kerberos credentials provided.
2023-06-12 22:46:27,034 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Submitting application master application_1686577483648_0004
2023-06-12 22:46:27,061 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl [] - Submitted application application_1686577483648_0004
2023-06-12 22:46:27,061 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Waiting for the cluster to be allocated
2023-06-12 22:46:27,063 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Deploying cluster, current state ACCEPTED
2023-06-12 22:46:31,086 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - YARN application has been deployed successfully.
2023-06-12 22:46:31,087 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node02:42192 of application application_1686577483648_0004.
Job has been submitted with JobID dfcb72ebf4a5f33d8e7967d6beaaf96d注意在使用-d参数启动时启动过程中可能会出现如下异常
Exception in thread Thread-5 java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration classloader.check-leaked-classloader.at org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:184)at org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:208)at org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2780)at org.apache.hadoop.conf.Configuration.getStreamReader(Configuration.java:3036)at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2995)at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2968)at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2848)at org.apache.hadoop.conf.Configuration.get(Configuration.java:1200)at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1812)at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1789)at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)at org.apache.hadoop.util.ShutdownHookManager.shutdownExecutor(ShutdownHookManager.java:145)at org.apache.hadoop.util.ShutdownHookManager.access$300(ShutdownHookManager.java:65)at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:102)解决方案是在flink的/conf/flink-conf.yaml配置文件中设置
classloader.check-leaked-classloader: false查看Yarn、Flink
访问http://node01:8088/cluster查看 打开Flink Web UI页面进行监控 a.访问启动日志中的JobManager地址如node02:42192 b.也可以在http://node01:8088/cluster页面中跳转到Flink的Web UI界面 查看、取消作业
[rootnode01 flink]# bin/flink list -t yarn-per-job -Dyarn.application.idapplication_1686577483648_00042023-06-12 22:55:43,755 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2023-06-12 22:55:43,755 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2023-06-12 22:55:43,864 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory (/usr/local/program/flink/conf) already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2023-06-12 22:55:43,927 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at node01/192.168.1.100:8032
2023-06-12 22:55:44,087 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-06-12 22:55:44,159 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node02:42192 of application application_1686577483648_0004.
Waiting for response...
------------------ Running/Restarting Jobs -------------------
12.06.2023 22:46:30 : dfcb72ebf4a5f33d8e7967d6beaaf96d : Flink Streaming Job (RUNNING)
--------------------------------------------------------------
No scheduled jobs.取消作业
# 如果取消作业整个Flink集群会停掉
bin/flink cancel -t yarn-per-job -Dyarn.application.idapplication_XXXX jobId[rootnode01 flink]# bin/flink cancel -t yarn-per-job -Dyarn.application.idapplication_1686577483648_0004 dfcb72ebf4a5f33d8e7967d6beaaf96dSLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
2023-06-12 22:57:06,430 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2023-06-12 22:57:06,430 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-root.
Cancelling job dfcb72ebf4a5f33d8e7967d6beaaf96d.
2023-06-12 22:57:06,560 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory (/usr/local/program/flink/conf) already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2023-06-12 22:57:06,638 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at node01/192.168.1.100:8032
2023-06-12 22:57:06,830 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-06-12 22:57:06,895 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node02:42192 of application application_1686577483648_0004.
Cancelled job dfcb72ebf4a5f33d8e7967d6beaaf96d.应用模式 应用模式同样非常简单与单作业模式类似直接执行flink run-application命令即可。 提交作业
执行命令提交作业
[rootnode01 flink]# bin/flink run-application -t yarn-application -c cn.ybzy.demo.WordCountDemo /root/demo-1.0-SNAPSHOT.jar2023-06-12 23:01:00,465 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted false, remoteHostTrusted false
2023-06-12 23:01:00,751 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted false, remoteHostTrusted false
2023-06-12 23:01:00,799 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted false, remoteHostTrusted false
2023-06-12 23:01:00,817 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Cannot use kerberos delegation token manager, no valid kerberos credentials provided.
2023-06-12 23:01:00,821 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Submitting application master application_1686577483648_0005
2023-06-12 23:01:00,847 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl [] - Submitted application application_1686577483648_0005
2023-06-12 23:01:00,848 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Waiting for the cluster to be allocated
2023-06-12 23:01:00,849 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Deploying cluster, current state ACCEPTED
2023-06-12 23:01:05,123 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - YARN application has been deployed successfully.
2023-06-12 23:01:05,124 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node03:40762 of application application_1686577483648_0005.查看、取消作业
查看作业
[rootnode01 flink]# bin/flink list -t yarn-application -Dyarn.application.idapplication_1686577483648_00052023-06-12 23:02:55,490 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2023-06-12 23:02:55,490 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2023-06-12 23:02:55,630 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory (/usr/local/program/flink/conf) already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2023-06-12 23:02:55,689 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at node01/192.168.1.100:8032
2023-06-12 23:02:55,844 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-06-12 23:02:55,905 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node03:40762 of application application_1686577483648_0005.
Waiting for response...
------------------ Running/Restarting Jobs -------------------
12.06.2023 23:01:05 : a66d8fa98d23210d36b5b005ff0a1c53 : Flink Streaming Job (RUNNING)
--------------------------------------------------------------
No scheduled jobs.取消作业
[rootnode01 flink]# bin/flink cancel -t yarn-application -Dyarn.application.idapplication_1686577483648_0005 a66d8fa98d23210d36b5b005ff0a1c532023-06-12 23:03:49,038 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2023-06-12 23:03:49,038 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-root.
Cancelling job a66d8fa98d23210d36b5b005ff0a1c53.
2023-06-12 23:03:49,156 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory (/usr/local/program/flink/conf) already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2023-06-12 23:03:49,204 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at node01/192.168.1.100:8032
2023-06-12 23:03:49,364 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-06-12 23:03:49,427 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node03:40762 of application application_1686577483648_0005.
Cancelled job a66d8fa98d23210d36b5b005ff0a1c53.从HDFS读取提交任务 通过yarn.provided.lib.dirs配置选项指定位置将flink的依赖上传到远程 将Flink本身的依赖和用户jar预先上传到HDFS而不需要单独发送到集群这就使得作业提交更加轻量了 上传flink的lib和plugins到HDFS上
[rootnode01 flink]# hadoop fs -mkdir /flink-dist
[rootnode01 flink]# hadoop fs -put lib/ /flink-dist
[rootnode01 flink]# hadoop fs -put plugins/ /flink-dist上传Flink开发程序jar包到HDFS
[rootnode01 flink]# hadoop fs -mkdir /flink-jar
[rootnode01 flink]# hadoop fs -put /root/demo-1.0-SNAPSHOT.jar /flink-jar提交作业
[rootnode01 flink]# bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirshdfs://node01:9000/flink-dist -c cn.ybzy.demo.WordCountDemo hdfs://node01:9000/flink-jar/demo-1.0-SNAPSHOT.jar2023-06-12 23:19:20,128 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Cluster specification: ClusterSpecification{masterMemoryMB2500, taskManagerMemoryMB2200, slotsPerTaskManager3}
2023-06-12 23:19:20,617 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted false, remoteHostTrusted false
2023-06-12 23:19:20,721 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted false, remoteHostTrusted false
2023-06-12 23:19:20,783 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Cannot use kerberos delegation token manager, no valid kerberos credentials provided.
2023-06-12 23:19:20,788 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Submitting application master application_1686577483648_0009
2023-06-12 23:19:20,816 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl [] - Submitted application application_1686577483648_0009
2023-06-12 23:19:20,816 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Waiting for the cluster to be allocated
2023-06-12 23:19:20,817 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Deploying cluster, current state ACCEPTED
2023-06-12 23:19:24,086 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - YARN application has been deployed successfully.
2023-06-12 23:19:24,086 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node02:43653 of application application_1686577483648_0009.Yarn模式高可用 Standalone模式中, 同时启动多个Jobmanager, 一个为leader其他为standby, 当leader挂了, 其他的才会有一个成为leader yarn的高可用是同时只启动一个Jobmanager, 当这个Jobmanager挂了之后, yarn会再次启动一个, 其实是利用的yarn的重试次数来实现的高可用 在yarn-site.xml中配置
propertynameyarn.resourcemanager.am.max-attempts/namevalue4/valuedescriptionThe maximum number of application master execution attempts. /description
/property在flink-conf.yaml中配置
# 次数应该小于yarn-site.xml中配置重试次数
yarn.application-attempts: 3
high-availability.type: zookeeper
high-availability.storageDir: hdfs://node01:9000/flink/yarn/ha
high-availability.zookeeper.quorum: node01:2181,node02:2181,node03:2181
high-availability.zookeeper.path.root: /flink-yarn启动yarn-session
[rootnode01 flink]# bin/yarn-session.sh -nm flink-testkill一个Jobmanager查看复活情况
jpskill -9 pid