当前位置: 首页 > news >正文

有服务器还需要买网站空间吗青岛做企业网站

有服务器还需要买网站空间吗,青岛做企业网站,贵州省健康码二维码图片下载,专业简章目录 ☄️前置工作 fenfa脚本 #x1f30b;概述 ☄️Flink是什么 ☄️特点#xff08;多nb#xff09; ☄️应用场景#xff08;不用看#xff09; ☄️分层API #x1f30b;配环境 ☄️wordcount ☄️WcDemoUnboundStreaming #x1f30b;集群部署 ☄️集…目录 ☄️前置工作 fenfa脚本 概述 ☄️Flink是什么 ☄️特点多nb ☄️应用场景不用看 ☄️分层API 配环境 ☄️wordcount ☄️WcDemoUnboundStreaming  集群部署 ☄️集群角色 ☄️集群规划 webUI提交作业 命令行提交作业 ​编辑 ☄️部署模式 会话模式Session Mode 单作业模式Per-Job Mode 应用模式Application Mode ☄️standalone运行模式  会话模式部署  单作业模式部署 应用模式部署  ☄️YARN 运行模式重点  会话模式部署  单作业模式部署  应用模式部署  ☄️历史服务器 深入运行流程 ☄️总体 ☄️核心概念 并行度设置 算子链 任务槽 ​编辑 任务槽和并行度的关系  ☄️作业提交流程 Standalone 会话模式作业提交流程 ☄️ Yarn 应用模式作业提交流程  ☄️前置工作 fenfa脚本 使用方法配置环境变量路径~/binfenfa 文件相对路径、绝对路径都行 #!/bin/bash #1. 判断参数个数 if [ $# -lt 1 ] thenecho Not Enough Arguement!exit; fi #2. 遍历集群所有机器 for host in hadoop2 hadoop3 doecho $host #3. 遍历所有目录挨个发送for file in $do#4. 判断文件是否存在if [ -e $file ]then#5. 获取父目录pdir$(cd -P $(dirname $file); pwd)#6. 获取当前文件的名称fname$(basename $file)ssh $host mkdir -p $pdirrsync -av $pdir/$fname $host:$pdirecho $pdir/$fname have trans !!!!!!!!!!!!!elseecho !!!!!!$file does not exists!!!!!!fidone done概述 ☄️Flink是什么 Apache Flink® — Stateful Computations over Data Streams | Apache Flink Apache Flink 是一个框架和分布式处理引擎用于对无界和有界数据流进行状态计算。Flink 被设计为在所有常见的集群环境中运行以内存速度和任何规模执行计算。 把流处理需要的额外数据保存成一个“状态”然后针对这条数据进行处理并且更新状态。这就是所谓的“有状态的流处理”。 无界数据流 有定义流的开始但没有定义流的结束会无休止的产生数据无界流的数据必须持续处理即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理因为输入是无限的 有界数据流 有定义流的开始也有定义流的结束有界流可以在摄取所有数据后再进行计算有界流所有数据可以被排序所以并不需要有序摄取有界流处理通常被称为批处理。 ☄️特点多nb 低延迟、高吞吐、结果的准确性和良好的容错性。 Flink主要特点如下 ⚫ 高吞吐和低延迟。每秒处理数百万个事件毫秒级延迟。 ⚫ 结果的准确性。Flink提供了事件时间event-time和处理时间processing-time语义。 对于乱序事件流事件时间语义仍然能提供一致且准确的结果。 ⚫ 精确一次exactly-once的状态一致性保证。 ⚫ 可以连接到最常用的外部系统如Kafka、Hive、JDBC、HDFS、Redis等。 ⚫ 高可用。本身高可用的设置加上与K8sYARN和Mesos的紧密集成再加上从故障中 快速恢复和动态扩展任务的能力Flink能做到以极少的停机时间7×24全天候运行。 Flink vs SparkStreaming  Spark以批处理为根本 • Spark数据模型Spark 采用 RDD 模型Spark Streaming 的 DStream 实际上也就是一组组小批数据 RDD 的集合 • Spark运行时架构Spark 是批计算将 DAG 划分为不同的 stage一个完成后才可以计算下一个 Flink以流处理为根本 • Flink数据模型Flink 基本数据模型是数据流以及事件Event序列 • Flink运行时架构Flink 是标准的流执行模式一个事件在一个节点处理完后可以直接发往下一个节点进行处理 ☄️应用场景不用看 1电商和市场营销 举例实时数据报表、广告投放、实时推荐 2物联网IOT 举例传感器实时数据采集和显示、实时报警交通运输业 3物流配送和服务业 举例订单状态实时更新、通知信息推送 4银行和金融业 举例实时结算和通知推送实时检测异常行为 ☄️分层API ⚫ 越顶层越抽象表达含义越简明使用越方便 ⚫ 越底层越具体表达能力越丰富使用越灵活 DataStream API流处理和DataSet API批处理封装了底层处理函数提供了通用的模块比如转换transformations包括map、flatmap等连接joins聚合aggregations窗口windows操作等。 注意Flink1.12以后DataStream API已经实现真正的流批一体所以DataSet API已经过时 配环境 IDEA创建Maven环境 导入配置项 properties flink.version1.17.0/flink.version /properties dependencies dependency groupIdorg.apache.flink/groupId artifactIdflink-streaming-java/artifactId version${flink.version}/version /dependency dependency groupIdorg.apache.flink/groupId artifactIdflink-clients/artifactId version${flink.version}/version /dependency /dependencies ☄️wordcount dataSet public static void main(String[] args) throws Exception {// TODO 准备环境ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();// TODO read fileString path data/goodnight.txt;DataSourceString dataSource env.readTextFile(path);// TODO mapFlatMapOperatorString, Tuple2String, Integer wordOne dataSource.flatMap(new FlatMapFunctionString, Tuple2String, Integer() {Overridepublic void flatMap(String value, CollectorTuple2String, Integer out) throws Exception {// 指定输出元组String words value.replace(,, ).replace(., );String[] words2 words.split( );for (String word : words2) {Tuple2String, Integer stringIntegerTuple2 Tuple2.of(word, 1);// 指定输出元组out.collect(stringIntegerTuple2);}}});// TODO group shuffleUnsortedGroupingTuple2String, Integer wordOneGroup wordOne.groupBy(0); // 表示元祖0AggregateOperatorTuple2String, Integer sum wordOneGroup.sum(1); // 表示元祖1sum.print();} dataStreaming public static void main(String[] args) throws Exception {// TODO 准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// TODO read fileString path data/goodnight.txt;DataStreamSourceString lines env.readTextFile(path);// TODO mapSingleOutputStreamOperatorTuple2String, Integer wordOne lines.flatMap(new FlatMapFunctionString, Tuple2String, Integer() {Overridepublic void flatMap(String value, CollectorTuple2String, Integer out) throws Exception {String words1 value.replace(,, ).replace(., );String[] words2 words1.split( );for (String word : words2) {Tuple2String, Integer wordOne Tuple2.of(word, 1);out.collect(wordOne);}}});// TODO group shuffleKeyedStreamTuple2String, Integer, String group wordOne.keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer value) throws Exception {return value.f0;}});SingleOutputStreamOperatorTuple2String, Integer sum group.sum(1);sum.print();env.execute(); //类似spark行动算子} 体现出流处理来一条处理一条前面的编号是并行度cpu核数  ☄️WcDemoUnboundStreaming  无界流socket 文本流  public static void main(String[] args) throws Exception {// TODO 准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// TODO read file // String path data/goodnight.txt;DataStreamSourceString hadoop1 env.socketTextStream(centos7, 7777);// TODO mapSingleOutputStreamOperatorTuple2String, Integer wordOne hadoop1.flatMap((String value, CollectorTuple2String, Integer out) - {String[] words value.split( );for (String word : words) {Tuple2String, Integer wordAndOne Tuple2.of(word, 1);out.collect(wordAndOne);}}).returns(Types.TUPLE(Types.STRING, Types.INT));// Flink 还具有一个类型提取系统可以分析函数的输入和返回类型自动获取类型信息// 从而获得对应的序列化器和反序列化器。但是由于 Java 中泛型擦除的存在在某些特殊情// 况下比如 Lambda 表达式中自动提取的信息是不够精细的——只告诉 Flink 当前的元素// 由“船头、船身、船尾”构成根本无法重建出“大船”的模样这时就需要显式地提供类// 型信息才能使应用程序正常工作或提高其性能。// 因为对于flatMap 里传入的Lambda 表达式系统只能推断出返回的是Tuple2 类型而无// 法得到 Tuple2String, Long。只有显式地告诉系统当前的返回类型才能正确地解析出完整// 数据。// TODO group shuffle// 只有一个参数不写类型也行KeyedStreamTuple2String, Integer, String group wordOne.keyBy(value - value.f0);SingleOutputStreamOperatorTuple2String, Integer sum group.sum(1);sum.print();// TODO execenv.execute();} 在 Flink 中.returns() 方法用于指定数据流的返回类型。这对于 Flink 的类型推断和优化非常重要。在你的代码中你创建了一个 SingleOutputStreamOperatorTuple2String, Integer它表示输出流中的每个元素都是一个包含字符串和整数的元组。 .returns() 方法的目的是帮助 Flink 确定流的返回类型以便进行类型检查和优化。在你的代码中如果没有使用 .returns() 方法Flink 将尝试根据上下文来推断返回类型但有时推断可能不准确或不完整。 通过使用 .returns(Types.TUPLE(Types.STRING, Types.INT))你明确告诉 Flink你的数据流将返回一个元组其中第一个字段是字符串类型第二个字段是整数类型。这有助于 Flink 更准确地理解和优化你的代码并在运行时执行必要的类型检查。 集群部署 ☄️集群角色 Flink提交作业和执行任务需要几个关键组件 • 客户端Client代码由客户端获取并做转换之后提交给JobManger • JobManager就是Flink集群里的“管事人”对作业进行中央调度管理而它获取到要执行的作业 后会进一步处理转换然后分发任务给众多的TaskManager。 • TaskManager就是真正“干活的人”数据的处理操作都是它们来做的。 ☄️集群规划 自己建三个虚拟机hostname可能不一样 1. 下载安装包flink-1.17.0-bin-scala_2.12.tgz将该jar 包上传到hadoop102 节点服务器的/opt/software(随意记住就行) 路径上。 夸克网盘分享-flink-1.17.0-bin-scala_2.12 2. 在/opt/software 路径上解压flink-1.17.0-bin-scala_2.12.tgz 到/opt/module 路径上 tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module/  3. 进入 conf 路径修改 flink-conf.yaml 文件指定 hadoop1 节点服务器为JobManager 修改flink-conf.yaml  # JobManager节点地址. jobmanager.rpc.address: hadoop102 jobmanager.bind-host: 0.0.0.0 rest.address: hadoop102 rest.bind-address: 0.0.0.0 # TaskManager节点地址.需要配置为当前机器名 taskmanager.bind-host: 0.0.0.0 taskmanager.host: hadoop102 在 Apache Flink 的配置文件中jobmanager.rpc.address 参数指定 JobManager 的 RPC 通信地址。在你的配置中jobmanager.rpc.address 被设置为 hadoop1这意味着 Flink JobManager 将绑定到主机名为 hadoop1 的地址。这是为了告诉 Flink 在启动 JobManager 时使用 hadoop1 作为其通信地址。 在 Apache Flink 的配置文件中taskmanager.bind-host 参数指定 TaskManager 绑定的网络接口地址。将其设置为 0.0.0.0 表示 TaskManager 将绑定到所有可用的网络接口使其可以接受来自任何网络接口的连接请求。这通常用于允许来自任何主机或网络的连接使 Flink 任务能够在广泛的网络范围内访问。  4. 修改workers 文件指定hadoop102、hadoop103 和hadoop104 为 TaskManager  hadoop102 hadoop103 hadoop104 5. 修改masters 文件  hadoop102:8081  另外在 flink-conf.yaml 文件中还可以对集群中的 JobManager 和TaskManager 组件 进行优化配置主要配置项如下  ⚫ jobmanager.memory.process.size对JobManager 进程可使用到的全部内存进行配置 包括 JVM 元空间和其他开销默认为1600M可以根据集群规模进行适当调整。  ⚫ taskmanager.memory.process.size对 TaskManager 进程可使用到的全部内存进行配置 包括 JVM 元空间和其他开销默认为1728M可以根据集群规模进行适当调整。  ⚫ taskmanager.numberOfTaskSlots对每个 TaskManager 能够分配的 Slot 数量进行配置 默认为1可根据TaskManager 所在的机器能够提供给Flink 的CPU 数量决定。所谓 Slot 就是 TaskManager 中具体运行一个任务所分配的计算资源。  ⚫ parallelism.defaultFlink 任务执行的并行度默认为1。优先级低于代码中进行的并 行度配置和任务提交时使用参数指定的并行度数量。 6. fenfa flink-1.17.0分别修改另外两个flink-conf.yaml中的 taskmanager.host: hadoop103  taskmanager.host: hadoop104  7. 原神启动 (base) [roothadoop1 bin]# bash start-cluster.sh Starting cluster. Starting standalonesession daemon on host hadoop1. Starting taskexecutor daemon on host hadoop1. Starting taskexecutor daemon on host hadoop2. Starting taskexecutor daemon on host hadoop3.webUI提交作业 IDEA中pom.xml导入打包配置 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.yuange/groupIdartifactIdFlink/artifactIdversion1.0-SNAPSHOT/versiondependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependency/dependenciespropertiesflink.version1.17.0/flink.versionmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.target/properties!-- 打包插件 --buildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion3.2.4/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationartifactSetexcludesexcludecom.google.code.findbugs:jsr305/excludeexcludeorg.slf4j:*/excludeexcludelog4j:*/exclude/excludes/artifactSetfiltersfilter!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --artifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filterstransformers combine.childrenappendtransformerimplementationorg.apache.maven.plugins.shade.resource.ServicesResourceTransformer/transformer/transformers/configuration/execution/executions/plugin/plugins/build /project 出现 hadoop1启动nc -lk 7777           提交 stop 命令行提交作业 bin/flink run -m hadoop102:8081 -c com.atguigu.wc.SocketStreamWordCount ./Flink-1.0-SNAPSHOT.jar 在浏览器中打开 Web UIhttp://hadoop102:8081 查看应用执行情况。  用 netcat 输入数据可以在 TaskManager 的标准输出Stdout看到对应的统计结果  ☄️部署模式 主要有以下三种会话模式Session Mode、单作业模式Per-Job Mode、应用模式Application Mode。  它们的区别主要在于集群的生命周期以及资源的分配方式以及应用的 main 方法到底 在哪里执行——客户端Client还是 JobManager。 会话模式Session Mode 需要先启动一个集群保持一个会话在这个会话中通过客户端提交作业。集群启动时所有资源就都已经确定所以所有提交的作业会竞争集群中的资源。 会话模式比较适合于单个规模小、执行时间短的大量作业。 单作业模式Per-Job Mode 会话模式因为资源共享会导致很多问题所以为了更好地隔离资源我们可以考虑为每个提交的 作业启动一个集群这就是所谓的单作业Per-Job模式 一个作业提交 现启动一个集群 作业完成后集群就会关闭所有资源也会释放。 这些特性使得单作业模式在生产环境运行更加稳定所以是实际应用的首选模式。 需要注意的是Flink本身无法直接这样运行所以单作业模式一般需要借助一些资源管理框架来启动集群比如YARN、KubernetesK8S 应用模式Application Mode 前面提到的两种模式下应用代码都是在客户端上执行然后由客户端提交给JobManager的。但是这种方式客户端需要占用大量网络带宽去下载依赖和把二进制数据发送给JobManager加上很多情况下我们提交作业用的是同一个客户端就会加重客户端所在节点的资源消耗 所以解决办法就是我们不要客户端了直接把应用提交到JobManger上运行。而这也就代表着我们需要为每一个提交的应用单独启动一个JobManager也就是创建一个集群。这个JobManager只为执行这一个应用而存在执行结束之后JobManager也就关闭了这就是所谓的应用模式。 应用模式与单作业模式都是提交作业之后才创建集群单作业模式是通过客户端来提交的客户端解析出的每一个作业对应一个集群而应用模式下是直接由JobManager执行应用程序的1 ☄️standalone运行模式  会话模式部署  独立模式是独立运行的不依赖任何外部的资源管理平台当然独立也是有代价的如果资源不足或者出现故障没有自动扩展或重分配资源的保证必须手动处理。所以独立模式一般只用在开发测试或作业非常少的场景下。  之前webUI提交作业就是这种提前启动集群并通过 Web 页面客户端提交任务可以多个任务但是集群资源固定。  单作业模式部署 Flink 的 Standalone 集群并不支持单作业模式部署。因为单作业模式需要借助一些资源管 理平台。  应用模式部署  应用模式下不会提前创建集群所以不能调用 start-cluster.sh 脚本。我们可以使用同样在bin 目录下的standalone-job.sh 来创建一个 JobManager。 1. 启动 nc -lk 7777 2. Flinkxxx-jar 包放到 lib/目录下 3. 启动 JobManager。 bin/standalone-job.sh start --job-classname com.yuange.wc.WcDemoUnboundStreaming 直接指定作业入口类脚本会到 lib 目录扫描所有的 jar 包。  4. 同样是使用 bin 目录下的脚本启动 TaskManager。  bin/taskmanager.sh start  5.stop bin/taskmanager.sh stop  bin/standalone-job.sh stop  ☄️YARN 运行模式重点  YARN 上部署的过程是客户端把 Flink 应用提交给 Yarn 的 ResourceManagerYarn 的ResourceManager 会向 Yarn 的 NodeManager 申请容器。在这些容器上Flink 会部署JobManager 和 TaskManager 的实例从而启动集群。Flink 会根据运行在 JobManger 上的作业所需要的Slot 数量动态分配TaskManager 资源。  在 YARN 上JobManager 和 TaskManager 运行在 YARN 容器中而这些容器由 YARN 的 ResourceManager 分配给 NodeManager 执行。NodeManager 负责监控和管理容器确保它们按照 ResourceManager 的指示执行。NodeManager 提供了容器的隔离和资源管理。 配置环境变量增加环境变量配置如下  HADOOP_HOME/opt/module/hadoop-3.3.4 看自己的在哪 export PATH$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin  export HADOOP_CONF_DIR${HADOOP_HOME}/etc/hadoop  export HADOOP_CLASSPATHhadoop classpath这是反引号esc键下边 启动Hadoop 集群包括HDFS 和YARN。 执行以下命令启动 netcat。 nc -lk 7777  会话模式部署  YARN 的会话模式与独立集群略有不同需要首先申请一个 YARN 会话YARN Session来启动Flink 集群。具体步骤如下  执行脚本命令向 YARN 集群申请资源开启一个YARN 会话启动Flink 集群。 [atguiguhadoop102 flink-1.17.0]$ bin/yarn-session.sh -nm [-d]后台执行模式不占用shell) test  要关闭 Flink YARN 会话YARN session你可以使用 yarn-session.sh 脚本具体步骤如下 打开终端窗口并登录到运行 Flink 会话的机器上。 进入 Flink 的 bin 目录通常是 Flink 安装目录下的 bin 子目录。 运行 yarn-session.sh 脚本并提供 -id 参数后接正在运行的 YARN 会话的 ID可以在启动时找到以关闭指定的会话。例如 ./yarn-session.sh -id session_id stop 其中 session_id 是你要关闭的 YARN 会话的 ID。通过提供会话的 ID你可以确保关闭正确的会话。 等待脚本执行完成。脚本将向 ResourceManager 发送关闭请求并等待会话成功终止。一旦会话关闭你将在终端上看到相应的消息。 请确保提供正确的会话 ID以免关闭错误的会话。如果你不确定会话 ID可以使用 yarn application -list 命令来列出当前运行的 YARN 应用程序并找到你要关闭的会话的 ID。 在关闭 Flink YARN 会话之后你可以通过 YARN ResourceManager 或 Flink Dashboard 来验证会话是否已成功终止。 单作业模式部署  在 YARN 环境中由于有了外部平台做资源调度所以我们也可以直接向 YARN 提交一 个单独的作业从而启动一个 Flink 集群。 启动nc -lk 7777 bin/flink run -t yarn-per-job \(固定  -c com.yuange.wc.WcDemoUnboundStreaming Flink-1.0-SNAPSHOT.jar  -d 表示以detached模式运行 Flink 作业。在这种模式下作业会在后台运行不会阻塞当前终端并且你可以关闭终端而不影响作业的运行。这对于长时间运行的作业或需要后台运行的作业非常有用。 -t 参数表示要运行的作业类型其中 yarn-per-job 是一种作业运行模式。在 yarn-per-job 模式下每个 Flink 作业都会启动一个独立的 YARN 会话该会话运行作业并在作业完成后终止。这与 Flink 的 YARN session cluster 模式不同YARN session cluster在 YARN 集群上维护一个长时间运行的 Flink 集群。 客户端xshell停止不影响任务运行在webUI点cancel job 应用模式部署  应用模式同样非常简单与单作业模式类似直接执行 flink run-application 命令即可。  1命令行提交  bin/flink run-application -t yarn-application -c com.yuange.wc.WcDemoUnboundStreaming Flink-1.0-SNAPSHOT.jar  2上传HDFS 提交  可以通过yarn.provided.lib.dirs 配置选项指定位置将 flink 的依赖上传到远程。  1上传flink 的lib 和 plugins 到HDFS 上 yarn每次都要上传Flink自身的依赖到HDFS不如自己先上传 http://hadoop1:9870/explorer.html#/ bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirshdfs://hadoop1:8020/flink-dist -c com.yuange.wc.WcDemoUnboundStreaming hdfs://hadoop1:8020/flink-jars/Flink-1.0-SNAPSHOT.jar ☄️历史服务器 运行 Flink job 的集群一旦停止只能去 yarn 或本地磁盘上查看日志不再可以查看作业挂掉之前的运行的 Web UI很难清楚知道作业在挂的那一刻到底发生了什么。如果我们还没有 Metrics 监的话那么完全就只能通过日志去分析和定位问题了所以如果能还原之前的 Web UI我们可以通过 UI 发现和定位一些问题。  Flink 提供了历史服务器用来在相应的 Flink 集群关闭后查询已完成作业的统计信息。我都知道只有当作业处于运行中的状态才能够查看到相关的 WebUI 统计信息。通过 History Server 我们才能查询这些已完成作业的统计信息无论是正常退出还是异常退出。 此外它对外提供了 REST API它接受 HTTP 请求并使用 JSON 数据进行响应。Flink 任务停止后JobManager 会将已经完成任务的统计信息进行存档History Server 进程则在任务停止后可以对任务统计信息进行查询。比如最后一次的 Checkpoint、任务运行时的相关配置。  hadoop fs -mkdir -p /logs/flink-job  修改conf/flink-conf.yaml, 拉到下边 # # HistoryServer ## The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)# Directory to upload completed jobs to. Add this directory to the list of # monitored directories of the HistoryServer as well (see below). jobmanager.archive.fs.dir: hdfs:///hadoop1:8020/logs/flink-his# The address under which the web-based HistoryServer listens. historyserver.web.address: hadoop1# The port under which the web-based HistoryServer listens. historyserver.web.port: 8082# Comma separated list of directories to monitor for completed jobs. historyserver.archive.fs.dir: hdfs:///hadoop1:8020/logs/flink-his# Interval in milliseconds for refreshing the monitored directories. historyserver.archive.fs.refresh-interval: 5000 3启动历史服务器  bin/historyserver.sh start  4停止历史服务器  bin/historyserver.sh stop  深入运行流程 ☄️总体 1JobMaster  JobMaster 是 JobManager 中最核心的组件负责处理单独的作业Job。所以 JobMaster和具体的Job 是一一对应的多个 Job 可以同时运行在一个Flink 集群中, 每个 Job 都有一个自己的JobMaster。需要注意在早期版本的 Flink 中没有 JobMaster 的概念而 JobManager 的概念范围较小实际指的就是现在所说的 JobMaster。          在作业提交时JobMaster 会先接收到要执行的应用。JobMaster 会把 JobGraph 转换成一个物理层面的数据流图这个图被叫作“执行图”ExecutionGraph它包含了所有可以并发执行的任务。JobMaster 会向资源管理器ResourceManager发出请求申请执行任务必要的资源。一旦它获取到了足够的资源就会将执行图分发到真正运行它们的 TaskManager上。 2资源管理器ResourceManager          ResourceManager 主要负责资源的分配和管理在Flink 集群中只有一个。所谓“资源” 主要是指TaskManager 的任务槽task slots。任务槽就是Flink 集群中的资源调配单元包含 了机器用来执行计算的一组 CPU 和内存资源。每一个任务Task都需要分配到一个 slot 上 执行。          这里注意要把 Flink 内置的 ResourceManager 和其他资源管理平台比如 YARN的 ResourceManager 区分开。          3分发器Dispatcher          Dispatcher 主要负责提供一个 REST 接口用来提交应用并且负责为每一个新提交的作业启动一个新的JobMaster 组件。Dispatcher 也会启动一个Web UI用来方便地展示和监控作业执行的信息。Dispatcher 在架构中并不是必需的在不同的部署模式下可能会被忽略掉。         4任务管理器TaskManager TaskManager 是Flink 中的工作进程数据流的具体计算就是它来做的。Flink 集群中必须至少有一个TaskManager每一个TaskManager 都包含了一定数量的任务槽task slots。Slot是资源调度的最小单位slot 的数量限制了TaskManager 能够并行处理的任务数量。  启动之后TaskManager 会向资源管理器注册它的 slots收到资源管理器的指令后TaskManager 就会将一个或者多个槽位提供给JobMaster 调用JobMaster 就可以分配任务来执行了。          在执行过程中TaskManager 可以缓冲数据还可以跟其他运行同一应用的 TaskManager交换数据。  ☄️核心概念 并行度设置 当要处理的数据量非常大时我们可以把一个算子操作“复制”多份到多个节点数据来了之后就可以到其中任意一个执行。这样一来一个算子任务就被拆分成了多个并行的“子任务”(subtasks再将它们分发到不同节点就真正实现了并行计算。          在 Flink 执行过程中每一个算子operator可以包含一个或多个子任务operator subtask这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行 这样包含并行子任务的数据流就是并行数据流它需要多个分区stream partition来分配并行任务。一般情况下一个流程序的并行度可以认为就是其所有算子中最大的并行度。一个程序中不同的算子可能具有不同的并行度。          例如如上图所示当前数据流中有source、map、window、sink 四个算子其中sink 算 子的并行度为1其他算子的并行度都为 2。所以这段流处理程序的并行度就是 2。  1代码中设置  我们在代码中可以很简单地在算子后跟着调用setParallelism()方法来设置当前算子的 并行度  stream.map(word - Tuple2.of(word, 1L)).setParallelism(2); 这种方式设置的并行度只针对当前算子有效。  另外我们也可以直接调用执行环境的 setParallelism()方法全局设定并行度  env.setParallelism(2);  这样代码中所有算子默认的并行度就都为 2 了。我们一般不会在程序中设置全局并行 度因为如果在程序中对全局并行度进行硬编码会导致无法动态扩容。  这里要注意的是由于 keyBy 不是算子所以无法对keyBy 设置并行度并行度通常是针对可以并行执行的算子的属性用于确定同时执行多个任务的数量。对于 keyBy 来说它并不直接执行任何计算而是仅仅重新组织数据流以便后续的操作。实际的并行度设置通常是在后续的算子上 2提交应用时设置  在使用 flink run 命令提交应用时可以增加-p 参数来指定当前应用程序执行的并行度 它的作用类似于执行环境的全局设置  bin/flink run –p 2 –c com.atguigu.wc.SocketStreamWordCount   ./FlinkTutorial-1.0-SNAPSHOT.jar  3) 配置文件中设置  我们还可以直接在集群的配置文件 flink-conf.yaml 中直接更改默认并行度  parallelism.default: 2  这个设置对于整个集群上提交的所有作业有效初始值为 1。无论在代码中设置、还是提交时的-p 参数都不是必须的所以在没有指定并行度的时候就会采用配置文件中的集群默认并行度。在开发环境中没有配置文件默认并行度就是当前机器的CPU 核心数 算子链 1一对一One-to-oneforwarding          这种模式下数据流维护着分区以及元素的顺序。比如图中的source和map算子source算子读取数据之后可以直接发送给 map 算子做处理它们之间不需要重新分区也不需要调整数据的顺序。这就意味着 map 算子的子任务看到的元素个数和顺序跟 source 算子的子任务产生的完全一样保证着“一对一”的关系。map、filter、flatMap 等算子都是这种 one-to-one 的对应关系。这种关系类似于Spark 中的窄依赖。  2重分区Redistributing          在这种模式下数据流的分区会发生改变。比如图中的map 和后面的keyBy/window 算子之间以及keyBy/window 算子和Sink 算子之间都是这样的关系。          每一个算子的子任务会根据数据传输的策略把数据发送到不同的下游目标任务。这些传输方式都会引起重分区的过程这一过程类似于 Spark 中的 shuffle 在Flink 中并行度相同的一对一one to one算子操作可以直接链接在一起形成一个“大”的任务task这样原来的算子就成为了真正任务里的一部分如下图所示。每个task 会被一个线程执行。这样的技术被称为“算子链”Operator Chain。 Flink 默认会按照算子链的原则进行链接合并如果我们想要禁止合并或者自行定义也可以在代码中对算子做一些特定的设置 // 禁用算子链  .map(word - Tuple2.of(word, 1L)).disableChaining(); // 从当前算子开始新链  .map(word - Tuple2.of(word, 1L)).startNewChain() 任务槽 很显然TaskManager 的计算资源是有限的并行的任务越多每个线程的资源就会越少。(一锅饭吃的人越多每个人吃的越少) 那一个 TaskManager 到底能并行处理多少个任务呢为了控制并发量我们需要在TaskManager 上对每个任务运行所占用的资源做出明确的划分这就是所谓的任务槽task slots。  每个任务槽task slot其实表示了 TaskManager 拥有计算资源的一个固定大小的子集。这些资源就是用来独立执行一个子任务的。 假如一个TaskManager有三个slot那么它会将管理的内存平均分成三份每个slot独自占据一份。这样一来我们在slot上执行一个子任务时相当于划定了一块内存“专款专用”就不需要跟来自其他作业的任务去竞争内存资源了。 所以现在我们只要2个TaskManager就可以并行处理分配好的5个任务了。 在Flink的/opt/module/flink-1.17.0/conf/flink-conf.yaml配置文件中可以设置TaskManager 的 slot 数量默认是 1 个 slot。  taskmanager.numberOfTaskSlots: 8  需要注意的是slot 目前仅仅用来隔离内存不会涉及 CPU 的隔离。在具体应用时可以将slot 数量配置为机器的CPU 核心数尽量避免不同任务之间对CPU 的竞争。这也是开发环境默认并行度设为机器CPU 数量的原因 当我们将资源密集型和非密集型的任务同时放到一个 slot 中它们就可以自行分配对资源占用的比例从而保证最重的活平均分配给所有的 TaskManager。  当然Flink 默认是允许 slot 共享的如果希望某个算子对应的任务完全独占一个 slot或者只有某一部分算子共享 slot我们也可以通过设置“slot 共享组”手动指定 .map(word - Tuple2.of(word, 1L)).slotSharingGroup(1); 任务槽的共享组_bilibili   有点混需要用的时候看几分钟 任务槽和并行度的关系  任务槽和并行度都跟程序的并行执行有关但两者是完全不同的概念。简单来说任务槽是 静 态 的 概 念 是 指 TaskManager 具 有 的 并 发 执 行 能 力 可 以 通 过 参 数taskmanager.numberOfTaskSlots 进行配置而并行度是动态概念也就是 TaskManager 运行程序时实际使用的并发能力可以通过参数 parallelism.default 进行配置 举例说明假设一共有3个TaskManager每一个TaskManager中的slot数量设置为3个那么一共有9 个task slot表示集群最多能并行执行9 个同一算子的子任务。  而我们定义word count 程序的处理操作是四个转换算子  source→ flatmap→ reduce→ sink          当所有算子并行度相同时容易看出source 和flatmap 可以合并算子链于是最终有三个 任务节点。  如果我们没有任何并行度设置而配置文件中默认parallelism.default1那么程序运行的默 认并行度为1总共有3个任务。由于不同算子的任务可以共享任务槽所以最终占用的slot 只有1个。9个slot只用了1个有8个空闲 作业并行度设置为2那么总共有6个任务共享任务槽之后会占用2个slot。同样就有7个slot空闲计算资源没有充分利用。所以可以看到设置合适的并行度才能提高效率 怎样设置并行度效率最高呢当然是需要把所有的slot都利用起来。考虑到slot共享我们可以直接把并行度设置为9这样所有27个任务就会完全占用9个slot。这是当前集群资源下能执行的最大并行度计算资源得到了充分的利用。 另外再考虑对于某个算子单独设置并行度的场景。例如如果我们考虑到输出可能是写入文件那会希望不要并行写入多个文件就需要设置sink算子的并行度为1。这时其他的算子并行度依然为9所以总共会有19个子任务。根据slot共享的原则它们最终还是会占用全部的9个slot而sink任务只在其中一个slot上执行。 ☄️作业提交流程 Standalone 会话模式作业提交流程 逻辑流图——作业流图——执行流图——物理流图 Web UI点击作业就能看到对应的作业图 ☄️ Yarn 应用模式作业提交流程
http://www.zqtcl.cn/news/279556/

相关文章:

  • 大型门户网站开发北京网站建设管庄
  • 大连建设工程网站网站建设组织管理怎么写
  • wordpress英文站注册域名需要注意什么
  • 营销型网站的建设重点是什么深圳logo设计公司排名
  • 做网站的用什么软件呢网站排名优化服务公司
  • 网站开发完整视频网站集约化建设较好的城市
  • 网站建设和平面设计应用网站如何做
  • 自己做网站需要多少费用asa8.4 做网站映射
  • 商业网站 模板黑龙江省建设厅安全员考试
  • 网站新备案不能访问室内装修网站模板
  • 工程师报考网站wordpress设置视频图片不显示图片
  • 徐州网站建设公司排名成都住建平台
  • 用来备案企业网站国外免费外贸网站
  • 网页背景做的比较好的网站做一个企业网站价格
  • 免费制图网站县级门户网站建设的报告
  • 北京网站建设网怎么用手机做一个网站
  • 网站建设管理办法关于公司门户网站建设的议案
  • 网站开发入职转正申请书体验好的网站
  • 在线精品课程网站开发网站备案号怎么修改
  • 网站建设 风险百度热搜的含义
  • 怎样创作网站公司做网站 要准备哪些素材
  • 网站上的平面海报怎么做南阳企业做网站
  • 佛山公众平台网站推广多少钱wordpress如何调用分类目录
  • 网站推广应该注意什么信息发布平台推广
  • 官方网站案例做网站私活在哪接
  • 做网站滨州wordpress 不同域名
  • 找人做设计的网站广州做网站(信科网络)
  • 如何选择网站做站方向青之峰网站建设
  • 福州哪家网站制作设计高端还实惠设计logo的理念
  • 吉林市网站建设促销式软文案例