合肥建设厅网站,推广英文,怎么做一个好的销售,网站开发 图形验证码Flink 中的 Slot、Task、Subtask、并行度 1.并行度2.Task 与线程3.算子链与 slot 共享资源组4.Task slots 与系统资源5.总结 我们在使用 Flink 时#xff0c;经常会听到 task#xff0c;slot#xff0c;线程 以及 并行度 这几个概念#xff0c;对于初学者来说#xff0c;这… Flink 中的 Slot、Task、Subtask、并行度 1.并行度2.Task 与线程3.算子链与 slot 共享资源组4.Task slots 与系统资源5.总结 我们在使用 Flink 时经常会听到 taskslot线程 以及 并行度 这几个概念对于初学者来说这几个概念以及它们与内存CPU 之间的关系经常搞不清楚下面我们就通过这篇文章来弄清楚这些概念。
1.并行度
特定算子的子任务subtask的 个数 称之为 并行度parallel。一般情况下一个 数据流的并行度 可以认为是其 所有算子中最大的并行度。Flink 中每个算子都可以在代码中通过 .setParallelism(n) 来重新设置并行度而并行执行的 subtask 要发布到不同的 slot 中去执行。
2.Task 与线程
对于分布式执行的任务Flink 将算子的 subtasks 链接成 tasks。每个 subtask 由一个线程执行。如下图中样例数据流用 5 个 subtask 执行因此就有 5 个并行线程。 上图中source map 算子组成一个 subtask并行度为 2keyby window apply 算子组成一个 subtask并行度为 2sink 算子组成一个 subtask并行度为 1。
3.算子链与 slot 共享资源组
前面提到 Flink 会将算子的 subtask 链接成 task实际上就是通过算子链操作来实现的。将算子链接成 task 的好处
✅ 它减少线程间切换、缓冲的开销并且减少延迟的同时增加整体吞吐量。✅ 链行为是可以配置的将两个算子链接在一起能使得它们在同一个线程中执行从而提升性能。
Flink 默认会将能链接的算子尽可能地进行链接例如两个 map 转换操作。 此外 Flink 还提供了对链接更细粒度控制的 API 以满足更多需求。
如果想对整个作业禁用算子链可以调用 StreamExecutionEnvironment.disableOperatorChaining()。下列方法还提供了更细粒度的控制。需要注意的是 这些方法只能在 DataStream 转换操作后才能被调用因为它们只对前一次数据转换生效。例如可以 someStream.map(...).startNewChain() 这样调用而不能 someStream.startNewChain() 这样。
另外一个 slot 共享资源组对应着 Flink 中的一个 slot 槽 可以根据需要手动地将各个算子隔离到不同的 slot 中。
Transformation Description Start new chain以当前 operator 为起点开始新的连接。如下的两个 mapper 算子会链接在一起而 filter 算子则不会和第一个 mapper 算子进行链接。someStream.filter(...).map(...).startNewChain().map(...)。Disable chaining任何算子不能和当前算子进行链接。someStream.map(...).disableChaining()。Set slot sharing group配置算子的资源组。Flink 会将相同资源组的算子放置到同一个 slot 槽中执行并将不同资源组的算子分配到不同的 slot 槽中从而实现 slot 槽隔离。如果所有输入操作都在同一个资源组资源组将从输入算子开始继承。Flink 默认的资源组名称为 default算子可以显式调用 slotSharingGroup(default) 加入到这个资源组中 .someStream.filter(...).slotSharingGroup(name)。
4.Task slots 与系统资源
每个 workerTaskManager都是一个 JVM 进程可以在单独的线程中执行一个或多个 subtask。为了控制一个 TaskManager 中接受多少个 task就有了所谓的 task slot至少一个。
每个 task slot 代表 TaskManager 中 资源的固定子集。例如具有 3 个 slot 的 TaskManager会将其托管内存 1 / 3 1/3 1/3 用于每个 slot。分配资源意味着 subtask 不会与其他作业的 subtask 竞争托管内存而是具有一定数量的保留托管内存。注意此处没有 CPU 隔离当前 slot 仅分离 task 的托管内存。
通过调整 task slot 的数量用户可以定义 subtask 如何互相隔离。每个 TaskManager 有一个 slot这意味着每个 task 组都在单独的 JVM 中运行例如可以在单独的容器中启动。具有多个 slot 意味着更多 subtask 共享同一 JVM。同一 JVM 中的 task 共享 TCP 连接通过多路复用和心跳信息。它们还可以共享数据集和数据结构从而减少了每个 task 的开销。 上边例子从图所示 5 个 subtask 用 5 个 task slot 来执行一定是这样分配的吗
这个还真不一定默认情况下上边例子只需要 2 个 slot 就可以了。 我们再看另外一个例子当我们把并行度调大为 6。 按照并行度拆开这个任务task我们发现会有 13 个 subtask那么是不是就意味着需要 13 个 slot 才能执行该任务呢
答案是否定的实际是只需要 6 个 slot 就够了。
为什么会这样呢我们来看两条规则
1️⃣ 默认情况下Flink 允许子任务共享 slot即使他们是不同任务的子任务。这样的结果就是一个 slot 可以保存作业的整个 pipeline。2️⃣ Task Slot 是静态的概念指的是 TaskManager 具有的并发执行能力。 实际上第一个 slot 会运行 3 个 subtask也就是执行 3 个线程。
前面也提到了 slot 只是做了内存隔离并没有做 CPU 隔离但是 CPU 资源是有限的所以我们在设置资源参数时需要考虑一下集群可提供的资源。 那么问题又来了上面这个图中所示需要 5 个 task slot但是默认情况下 Flink 会自动优化成为需要 2 个 slot如果我们不想使用默认的 slot 个数来执行呢那就要通过 slot 共享组来实现了。
DataStreamString inputDataStream env.socketTextStream(host, port);
DataStreamTuple2String, Integer resultStream inputDataStream.flatMap(new WordCount.MyFlatMapper()).slotSharingGroup(green).keyBy(0).sum(1).setParallelism(2).slotSharingGroup(red);resultStream.print().setParallelism(1);这几行代码几个 subtask并行度是多少用几个 task slot
看一下以上代码运行时 Flink Web UI 从 Web UI 界面可以看出该任务被切分成了 5 个子 task按照最大并行度算子来算这个任务的并行度应该为 2那么这 5 个 subtask 占用了几个 slot 呢
通过设置 slotSharingGroup是手动干预 slot 分配的手段之一默认情况下整个 StreamGraph 都会用一个默认的 default SlotSharingGroup即所有的 task 都可以共用一个 slot。
上面代码里source 算子并没有显式分配 slot 共享组所以它将被分在默认的 default 共享组里而 flatMap 算子被显式指定到了 green 共享组里聚合算子同样被显式指定到了 red 共享组里那么最后的 sink 算子呢注意默认情况下每一个算子会与其前一个算子保持在同一个共享组内所以 sink 算子也就是上边的打印算子也会被分配在 red 共享组里按照 slot 共享组进行分组每个分组最大的并行度相加就是这个任务所占用的总共 slot所以应该是 4 个。 5.总结
通过上面几个例子我们已经很清楚的理解这些概念了总结以下几点
1️⃣ Flink 中 slot 是任务执行所申请资源的最小单元同一个 TaskManager 上的所有 slot 都只是做了内存分离并没有做 CPU 隔离。2️⃣ 每一个 TaskManager 都是一个 JVM 进程如果某个 TaskManager 上只有一个 slot这意味着每个 task 组都在单独的 JVM 中运行如果有多个 slot 就意味着更多 subtask 共享同一 JVM。3️⃣ 一般情况下有多少个 subtask就是有多少个并行线程而并行执行的 subtask 要发布到不同的 slot 中去执行。4️⃣ Flink 默认会将能链接的算子尽可能地进行链接也就是算子链Flink 会将同一个算子链分组内的 subtask 都发到同一个 slot 去执行也就是说一个 slot 可能要执行多个 subtask即多个线程。5️⃣ Flink 可以根据需要手动地将各个算子隔离到不同的 slot 中。6️⃣ 一个任务所用的总共 slot 为所有资源隔离组所占用的 slot 之和同一个资源隔离组内按照算子的最大并行度来分配 slot。