网站转发代码,石家庄公司网站设计,招一个程序员可以做网站吗,增城新塘网站建设转换算子Transformation 概述基本转换算子映射Map扁平映射flatMap过滤Filter 聚合算子按键分区keyBy归约聚合reduce简单聚合sum、min、max、minBy、maxBy 物理分区算子随机分配轮询分配重缩放广播全局分区自定义分区 分流操作Filter分流SideOutPut分流Split分流 合流操作联合Un… 转换算子Transformation 概述基本转换算子映射Map扁平映射flatMap过滤Filter 聚合算子按键分区keyBy归约聚合reduce简单聚合sum、min、max、minBy、maxBy 物理分区算子随机分配轮询分配重缩放广播全局分区自定义分区 分流操作Filter分流SideOutPut分流Split分流 合流操作联合Union连接ConnectCoMap、CoFlatMapCoProcessFunction 算子链和资源组创建新链禁止链接配置Slot共享组 用户自定义函数UDF函数类匿名函数富函数类 概述 转换算子Transformation是ApacheFlink中用于对数据流进行处理和转换的操作。在Flink中数据流被抽象为一个有向无环图DAG转换算子可以将数据流的每个元素进行操作并生成新的数据流。 因此Flink中的转换算子是指对输入数据流进行转换操作的一类算子它是将一个或多个DataStream转换为新的DataStream 特点
转换算子接受一个或多个输入数据流并产生一个或多个输出数据流。每个转换算子都代表一个具体的数据处理操作可以在数据流上执行诸如映射、过滤、聚合、分组等操作。转换算子可以按照不同的方式组合在一起形成复杂的数据流处理逻辑。可以通过方法链式调用来连接多个转换算子形成具体的数据处理流程。常见Flink转换算子 通过组合这些转换算子可以构建出复杂的数据处理流程实现各种业务逻辑的数据处理和分析。 Map对输入数据流中的每个元素应用一个函数并将函数的输出作为输出数据流中的元素FlatMap与 Map 类似但可以通过一个函数返回多个输出元素Filter根据指定的条件过滤输入数据流中的元素并将符合条件的元素作为输出数据流中的元素KeyBy按照指定的键对输入数据流分组并返回分组后的数据流Reduce对输入数据流中的每个分组应用一个函数进行聚合并将聚合结果作为输出数据流中的元素Aggregations针对分组后的数据流进行聚合可以使用 Sum、Max、Min、Avg 等内置函数Window按照指定的时间或者大小划分输入数据流并在窗口上应用聚合函数Join将两个输入数据流中的元素按照指定的键进行连接基本转换算子 Flink中基本的转换算子有Map、Filter 和 FlatMap。它们分别用于对每个输入元素应用一个函数根据指定的条件过滤输入元素以及将每个输入元素映射为多个输出元素。 映射Map Map算子接受一个函数作为参数该函数将输入数据流中的每个元素映射为一个新的元素并将这些新元素组成一个输出数据流。 注意通常在使用Flink算子的时候可以使用匿名类、Lambda、实现类
方式一传入匿名类实现MapFunction public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceInteger stream env.fromCollection(Arrays.asList(1, 2, 3));/*** MapFunction实现类的泛型类型与输入数据类型和输出数据的类型有关。* * 实现MapFunction接口时需要指定两个泛型分别是输入和输出的类型还需要重写map()方法定义从一个输入事件转换为另一个输出事件的具体逻辑*/SingleOutputStreamOperatorString map stream.map(new MapFunctionInteger, String() {Overridepublic String map(Integer integer) throws Exception {return 数字: integer;}});map.print();env.execute();}方式二使用Lambda表达式 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceInteger stream env.fromCollection(Arrays.asList(1, 2, 3));SingleOutputStreamOperatorString map stream.map((MapFunctionInteger, String) integer - 数字: integer);map.print();env.execute();}方式三传入MapFunction的实现类 public static void main(String[] args) throws Exception {class MyMap implements MapFunctionInteger, String {Overridepublic String map(Integer integer) throws Exception {return 数字: integer;}}StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceInteger stream env.fromCollection(Arrays.asList(1, 2, 3));stream.map(new MyMap()).print();env.execute();}扁平映射flatMap FlatMap算子接受一个函数作为参数该函数将输入数据流中的每个元素映射为多个新的元素并将这些新元素组成一个输出数据流。 flatMap是先按照某种规则对数据进行打散拆分再对拆分后的元素做转换处理。具有消费一个元素可以产生0到多个元素的特点。 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString stream env.fromCollection(Arrays.asList(h e l l o));stream.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String s, CollectorString collector) throws Exception {String[] split s.split( );for (String word : split) {collector.collect(word);}}}).print();env.execute();}6 h
6 e
6 l
6 l
6 o过滤Filter Filter算子接受一个函数作为参数该函数返回一个布尔值表示是否应该保留输入数据流中的当前元素。如果该函数返回true则当前元素被保留否则当前元素被丢弃。 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceInteger stream env.fromCollection(Arrays.asList(1, 2, 3));stream.filter(new FilterFunctionInteger() {Overridepublic boolean filter(Integer a) throws Exception {return a ! 2;}}).print();env.execute();}聚合算子
Flink提供了多种聚合算子用于对数据流进行各种不同类型的聚合操作。
按键分区keyBy 在Flink中要做聚合需要先进行分区分区操作是通过keyBy来完成的。keyBy 是 Flink 中的一个操作符用于将数据流按照指定的 key 进行分区 在 Flink 中数据流被分为多个分区每个分区都有一个或多个并行的任务来处理数据。keyBy 操作符会将数据流中具有相同 key 的数据分配到同一个分区中从而保证相同 key 的数据被同一个任务处理。 keyBy 操作符常用于聚合操作例如对某个字段进行求和、求平均值等。在使用 keyBy 操作符时需要指定一个或多个字段作为 key这些字段的值相同的数据将被分配到同一个分区中。 在逻辑上将流划分为不相交的分区。具有相同 key 的记录都分配到同一个分区。在内部 keyBy() 是通过哈希分区实现的。有多种指定 key 的方式。 注意 在Flink内部它是通过计算key的哈希值来对分区数进行取模运算实现的。因此如果Key是一个POJO对象应该必须重写 hashCode()方法。 KeyBy是将DataStream转KeyedStream public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString stream env.fromCollection(Arrays.asList(ab, abc, bc, ab, bc));// 以数据源本身作为 key 做一个分区操作KeyedStreamString, String keyedStream stream.keyBy(new KeySelectorString, String() {Overridepublic String getKey(String s) throws Exception {return s;}});keyedStream.print();env.execute();}keyBy得到的结果将不再是 DataStream而是会将 DataStream 转换为KeyedStream
11 abc
4 ab
3 bc
4 ab
3 bc归约聚合reduce Flink的归约聚合reduce算子是一种常用的聚合操作它对数据流中的元素进行逐个聚合将当前元素与上一个聚合结果合并得到一个单一的结果 在相同 key 的数据流上“滚动”执行 reduce。将当前元素与最后一次 reduce 得到的值组合然后输出新值。 reduce对已有的数据进行归约处理把每一个新输入的数据和当前已经归约出来的值再做一个聚合计算。在相同key的数据流上持续滚动执行 reduce。将当前元素与最后一次 reduce 得到的值组合然后输出新值。注意 Reduce是将KeyedStream 转 DataStream 定义一个输入数据流其包含了一系列整数通过reduce算子对流进行归约聚合将相邻元素相加得到最终结果。 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceInteger stream env.fromCollection(Arrays.asList(1, 4, 6, 3, 2, 5));KeyedStreamInteger, Boolean keyBy stream.keyBy(number - number 3);keyBy.reduce(new ReduceFunctionInteger() {Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {System.out.println(value1 value1 value2 value2); return value1 value2; // 将两个值相加}}).print();env.execute();}在流处理的底层实现过程中实际上是将中间合并结果作为任务的一个状态保存起来之后每来一个新的数据就和之前的聚合状态进一步做归约
1 1
8 4
value1 1 value2 3
value1 4 value2 6
1 4
8 10
value1 4 value2 2
value1 10 value2 5
1 6
8 15简单聚合sum、min、max、minBy、maxBy 有了按键分区的数据流 KeyedStream就可以基于它进行聚合操作。使用聚合算子可以方便地对数据流进行各种不同类型的聚合操作从而帮助实现各种复杂的分析和处理需求。 聚合算子主要包括以下几种
sum对数据流中的元素求和min求数据流中的最小值minBy根据指定的键值求数据流中的最小值并返回该最小值所在的全部元素max求数据流中的最大值maxBy根据指定的键值求数据流中的最大值并返回该最大值所在的全部元素转换算子需要实现自定义函数聚合需要指定字段即可。指定字段的方式有两种指定位置和指定名称。
对于元组类型的数据可以使用这两种方式来指定字段。需注意元组中字段的名称是以 f0、f1、f2、…来命名。数据流的类型是POJO类那么就只能通过字段名称来指定不能通过位置来指定。sum public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceInteger stream env.fromCollection(Arrays.asList(1, 4, 6, 3, 2, 5));KeyedStreamInteger, Boolean keyBy stream.keyBy(number - number 3);SingleOutputStreamOperatorInteger max keyBy.sum(0);max.print();env.execute();}max public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceInteger stream env.fromCollection(Arrays.asList(1, 4, 6, 3, 2, 5));KeyedStreamInteger, Boolean keyBy stream.keyBy(number - number 3);SingleOutputStreamOperatorInteger max keyBy.max(0);max.print();env.execute();}min
SingleOutputStreamOperatorInteger max keyBy.min(0);minBy public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceTuple2String, Integer dataStreamSource env.fromElements(Tuple2.of(key1, 1), Tuple2.of(key2, 2), Tuple2.of(key1, 3), Tuple2.of(key2, 4));KeyedStreamTuple2String, Integer, String keyedStream dataStreamSource.keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer tuple) throws Exception {return tuple.f0;}});// 按第二个字段选择最小值SingleOutputStreamOperatorTuple2String, Integer minBy keyedStream.minBy(1);minBy.print();env.execute();}maxBy // 按第二个字段选择最大值
SingleOutputStreamOperatorTuple2String, Integer minBy keyedStream.maxBy(1);物理分区算子 Flink 也提供以下方法让用户根据需要在数据转换完成后对数据分区进行更细粒度的配置。 物理分区算子Physical Partitioning是Flink中用于将数据流划分到不同物理分区的一种技术。通过将数据流划分到不同的分区可以实现负载均衡、并行计算和提高吞吐量等优化。 Flink提供了多种物理分区算子可以根据不同的需求选择适合的分区策略。常见的物理分区策略有随机分配Random、轮询分配Round-Robin、重缩放Rescale和广播Broadcast 随机分配 随机分配Random是一种将数据随机分配到各个分区的策略。每个分区接收的数据量可能不同适用于无需考虑数据倾斜的场景。 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSourceInteger dataStreamSource env.fromElements(1, 2, 3, 4, 5, 6);DataStreamInteger partition dataStreamSource.shuffle();partition.print();env.execute();}2 4
1 1
2 5
1 2
1 3
1 6轮询分配 轮询分配Round-Robin是一种均匀地按照轮询顺序将数据分配到各个分区的策略。每个分区依次接收一个元素然后再轮到下一个分区如此循环。 DataStreamSourceInteger dataStreamSource env.fromElements(1, 2, 3, 4, 5, 6);// rebalance可以解决数据源倾斜的场景
DataStreamInteger partition dataStreamSource.rebalance();2 1
2 3
2 5
1 2
1 4
1 6重缩放 重缩放分区和轮询分区非常相似。缩放Rescale是一种根据数据的大小动态调整分区的策略。它根据每个分区中的数据量进行动态的分区调整使各个分区的数据负载均衡。 DataStreamSourceInteger dataStreamSource env.fromElements(1, 2, 3, 4, 5, 6);
DataStreamInteger partition dataStreamSource.rescale();1 1
2 2
1 3
2 4
1 5
2 6广播 广播Broadcast是一种将数据复制到所有分区的策略即所有分区都会接收到相同的数据。广播适合于需要在所有分区上进行全局操作的场景。 DataStreamSourceInteger dataStreamSource env.fromElements(1, 2, 3, 4, 5, 6);
DataStreamInteger partition dataStreamSource.broadcast();1 1
2 1
1 2
2 2
1 3
2 3
1 4
2 4
1 5
2 5
1 6
2 6注意广播会在每个并行任务之间复制数据并占用更多的内存和网络带宽因此应慎重使用
全局分区 全局分区Global Partitioning是一种特殊的分区方式。它将所有的输入流数据都发送到下游算子的第一个并行子任务中去。 注意
它可能会产生数据倾斜问题。由于所有数据都发送到同一个并行任务这个任务可能会成为瓶颈并导致性能下降相当于强行让下游任务并行度变成了1所以使用这个操作需要非常谨慎可能对程序造成很大的压力DataStreamSourceInteger dataStreamSource env.fromElements(1, 2, 3, 4, 5, 6);
// 全部数据发往第一个子任务
DataStreamInteger partition dataStreamSource.global();1 1
1 2
1 3
1 4
1 5
1 6自定义分区 在Flink中可以使用partitionCustom()方法来实现自定义的分区策略。通过自定义分区可以根据自己的业务需求将数据合理地分发到不同的分区中。 自定义分区需要实现Partitioner接口并重写其中的partition()方法。partition()方法接收一个键和键的总数作为参数并返回要分配到的分区编号从0开始。 通过自定义分区策略可以根据具体的业务场景将数据分配到合适的分区中用于实现更精确的数据处理和控制。 自定义分区器将奇数和偶数分到不同的分区中
public class MyPartitioner implements PartitionerInteger {Overridepublic int partition(Integer key, int numPartitions) {if (key % 2 0) {// 将偶数分配到第一个分区return 0;} else {// 将奇数分配到第二个分区return 1;}}
}在Flink程序中使用自定义分区 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSourceInteger dataStreamSource env.fromCollection(Arrays.asList(1, 2, 3));// DataStreamInteger partitionCustom dataStreamSource.partitionCustom(new MyPartitioner(), value - value);DataStreamInteger partitionCustom dataStreamSource.partitionCustom(new MyPartitioner(), new KeySelectorInteger, Integer() {Overridepublic Integer getKey(Integer value) throws Exception {return value;}});partitionCustom.print();env.execute();}2 1
1 2
2 3分流操作 分流就是将一条数据流拆分成完全独立的两条、甚至多条流。分流就是基于一个DataStream定义一些筛选条件将符合条件的数据拣选出来放到对应的流里。 把输入源按照需要进行拆分比如期望把订单流按照金额大小进行拆分或者把用户访问日志按照访问者的地理位置进行拆分等。 Filter分流 Filter算子用来根据用户输入的条件进行过滤每个元素都会被 filter() 函数处理如果 filter() 函数返回 true 则保留否则丢弃。那么用在分流的场景可以做多次 filter把需要的不同数据生成不同的流。 针对同一条流多次独立调用.filter()方法进行筛选就可以得到拆分之后的流。 根据奇偶性将输入流分流成两个输出流 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceInteger stream env.fromCollection(Arrays.asList(1, 2, 3, 4, 5));SingleOutputStreamOperatorInteger even stream.filter(value - value % 2 0);SingleOutputStreamOperatorInteger odd stream.filter(value - value % 2 1);even.print(偶数-job);odd.print(奇数-job);env.execute();}偶数-job:1 2
奇数-job:1 5
偶数-job:3 4
奇数-job:9 1
奇数-job:11 3直接使用filter来实现分流效果存在缺点
1.代码显得有些冗余2.同一个数据要被处理两遍调用两次filter不够高效SideOutPut分流 使用侧输出流SideOutput也可以实现数据的分流操作。侧输出流允许将不符合主要流处理逻辑的数据发送到一个或多个辅助输出流中。 使用步骤
1.使用OutputTag定义两个侧输出流的标签evenTag和oddTag2.通过processElement()方法的逻辑主要处理逻辑将负数发送到主输出流而偶数、奇数通过ctx.output()方法发送到侧输出流evenTag与oddTag3.最后使用getSideOutput()方法获取侧输出流通过侧输出流机制可以将不符合主要处理逻辑的数据单独处理以实现更灵活的数据分流和处理 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSourceInteger stream env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, -6));/*** 创建OutputTag对象* 分别指定: 标签名、放入侧输出流的数据类型(Typeinformation)*/OutputTagInteger evenTag new OutputTag(even, Types.INT);OutputTagInteger oddTag new OutputTag(odd, Types.INT);// 使用process算子SingleOutputStreamOperatorInteger process stream.process(new ProcessFunctionInteger, Integer() {Overridepublic void processElement(Integer value, Context ctx, CollectorInteger out) throws Exception {if (value 0) {if (value % 2 0) {// 偶数放到侧输出流evenTag中// 调用上下文对象ctx的output方法,分别传入 Tag对象、放入侧输出流中的数据ctx.output(evenTag, value);} else if (value % 2 1) {// 奇数放到侧输出流oddTag中ctx.output(oddTag, value);}} else {// 负数 数据放到主流中out.collect(value);}}});// 在主流中根据标签 获取 侧输出流SideOutputDataStreamInteger even process.getSideOutput(evenTag);SideOutputDataStreamInteger odd process.getSideOutput(oddTag);// 打印主流process.printToErr(主流-负数-job);//打印 侧输出流even.print(偶数-job);odd.print(奇数-job);env.execute();}奇数-job:1 1
偶数-job:2 2
奇数-job:1 3
偶数-job:2 4
奇数-job:1 5
主流-负数-job:2 -6Split分流 Split也是将流进行切分的方法需要在split算子中定义OutputSelector然后重写其中的select方法将不同类型的数据进行标记最后对返回的SplitStream使用select方法将对应的数据选择出来。 注意 使用split算子切分过的流是不能进行二次切分的 Split算子在Flink较新版本中已弃用推荐使用SideOutPut进行流的拆分 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//获取数据源List data new ArrayListTuple3Integer,Integer,Integer();data.add(new Tuple3(0,1,0));data.add(new Tuple3(0,2,2));data.add(new Tuple3(0,1,1));DataStreamSourceTuple3Integer,Integer,Integer items env.fromCollection(data);SplitStreamTuple3Integer, Integer, Integer splitStream items.split(new OutputSelectorTuple3Integer, Integer, Integer() {Overridepublic IterableString select(Tuple3Integer, Integer, Integer value) {ListString tags new ArrayList();if (value.f0 0) {tags.add(zero);} else if (value.f0 1) {tags.add(one);}return tags;}});splitStream.select(zero).print();splitStream.select(one).printToErr();//打印结果String jobName zero one streaming;env.execute(jobName);}合流操作
联合Union 联合操作将两个或多个数据流联合来创建一个包含所有流中数据的新流。这种合并操作会保留所有输入流的顺序。流中的数据类型必须相同合并之后的新流会包括所有流中的元素数据类型不变。 注意 如果一个数据流和自身进行联合这个流中的每个数据将在合并后的流中出现两次。 流的联合受限于数据类型不能改变缺乏灵活性 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceInteger source1 env.fromElements(1, 2, 3);DataStreamSourceInteger source2 env.fromElements(4, 5, 6);DataStreamSourceString source3 env.fromElements(7, 8, 9);DataStreamInteger union source1.union(source2).union(source3.map(number - Integer.valueOf(number)));
// DataStreamInteger union source1.union(source2, source3.map(number - Integer.valueOf(number)));union.print();env.execute();}1
2
3
4
5
6
7
8
9连接Connect 在Flink中可以使用connect()操作符将两个或多个流连接在一起以形成ConnectedStreams。连接流提供一种将不同类型的流合并在一起的方式通过它可以对每个流应用不同的处理逻辑但它们会共享相同的上下文信息。 使用connect合并流一次只能连接2条流流的数据类型可以不一样连接后可以调用 map、flatmap、process来处理 DataStreamSourceInteger source1 env.fromElements(1, 2, 3);
DataStreamSourceString source2 env.fromElements(4, 5, 6);
ConnectedStreamsInteger, String connect source1.connect(source2);CoMap、CoFlatMap 类似于在连接的数据流上进行 map 和 flatMap。 如果是调用.map()就需要传入一个CoMapFunction需要实现map1()、map2()两个方法
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceInteger source1 env.fromElements(1, 2, 3);DataStreamSourceString source2 env.fromElements(4, 5, 6);ConnectedStreamsInteger, String connect source1.connect(source2);SingleOutputStreamOperatorInteger result connect.map(new CoMapFunctionInteger, String, Integer() {Overridepublic Integer map1(Integer value) throws Exception {return value;}Overridepublic Integer map2(String value) throws Exception {return Integer.parseInt(value);}});result.print();env.execute();}调用.flatMap()
connectedStreams.flatMap(new CoFlatMapFunctionInteger, String, String() {Overridepublic void flatMap1(Integer value, CollectorString out) {out.collect(value.toString());}Overridepublic void flatMap2(String value, CollectorString out) {for (String word: value.split( )) {out.collect(word);}}
});CoProcessFunction 调用.process()时传入一个CoProcessFunction。它需要实现的就是processElement1()、processElement2()两个方法 假设有两个输入流将这两个流合并计算得到每个key对应的合计并输出结果流 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceTuple2String, Integer source1 env.fromElements(Tuple2.of(key1, 1), Tuple2.of(key2, 4), Tuple2.of(key1, 2));DataStreamSourceTuple2String, Integer source2 env.fromElements(Tuple2.of(key1, 3), Tuple2.of(key2, 5), Tuple2.of(key2, 6));ConnectedStreamsTuple2String, Integer, Tuple2String, Integer connect source1.connect(source2);// 进行keyby操作将key相同数据放到一起ConnectedStreamsTuple2String, Integer, Tuple2String, Integer connectKeyby connect.keyBy(s1 - s1.f0, s2 - s2.f0);/*** 对2个流中相同key的值求和*/SingleOutputStreamOperatorString process connectKeyby.process(new CoProcessFunctionTuple2String, Integer, Tuple2String, Integer, String() {MapString, Integer map new HashMap();/*** 第一条流的处理逻辑* param value 第一条流的数据* param ctx 上下文* param out 采集器* throws Exception*/Overridepublic void processElement1(Tuple2String, Integer value, Context ctx, CollectorString out) throws Exception {String key value.f0;if (!map.containsKey(key)) {// 如果key不存在则将值直接put进mapmap.put(key, value.f1);} else {// key存在,则计算获取上一次put的值 本次的值Integer total map.get(key) value.f1;map.put(key, total);}out.collect(processElement1 key key value value total map.get(key));}/*** 第二条流的处理逻辑* param value 第二条流的数据* param ctx 上下文* param out 采集器* throws Exception*/Overridepublic void processElement2(Tuple2String, Integer value, Context ctx, CollectorString out) throws Exception {String key value.f0;if (!map.containsKey(key)) {// 如果key不存在则将值直接put进mapmap.put(key, value.f1);} else {// key存在,则计算获取上一次put的值 本次的值Integer total map.get(key) value.f1;map.put(key, total);}out.collect(processElement2 key key value value total map.get(key));}});process.print();env.execute();}3 processElement1 key key2 value (key2,4)total 4
4 processElement1 key key1 value (key1,1)total 1
4 processElement2 key key1 value (key1,3)total 4
4 processElement1 key key1 value (key1,2)total 6
3 processElement2 key key2 value (key2,5)total 9
3 processElement2 key key2 value (key2,6)total 15算子链和资源组 将两个算子链接在一起能使得它们在同一个线程中执行从而提升性能。Flink默认会将能链接的算子尽可能地进行链接(例如两个map转换操作)。 此外 Flink 还提供了对链接更细粒度控制的 API 以满足更多需求
如果想对整个作业禁用算子链可以调用 StreamExecutionEnvironment.disableOperatorChaining()。下列方法还提供了更细粒度的控制。需要注意的是这些方法只能在 DataStream 转换操作后才能被调用因为它们只对前一次数据转换生效。例如可以 someStream.map(…).startNewChain() 这样调用而不能 someStream.startNewChain() 这样。
一个资源组对应着 Flink 中的一个 slot 槽更多细节请看 slots 。 你可以根据需要手动地将各个算子隔离到不同的 slot 中。
任务链和资源组 ( Task chaining and resource groups ) 也是 Flink 提供的底层 API用于控制任务链和资源分配。默认情况下如果操作允许 (例如相邻的两次 map 操作) 则 Flink 会尝试将它们在同一个线程内进行从而可以获取更好的性能。但是 Flink 也允许用户自己来控制这些行为这就是任务链和资源组 API
创建新链 使用startNewChain基于当前算子创建一个新的算子链。 例如后面两个map将被链接起来而 filter 和第一个 map 不会链接在一起。
someStream.filter(...).map(...).startNewChain().map(...);禁止链接 disableChaining操作用于禁止将其他操作与当前操作放置于同一个任务链中 someStream.map(...).disableChaining();配置Slot共享组 为某个算子设置slot共享组。Flink会将同一个slot共享组的算子放在同一个slot 中而将不在同一slot共享组的算子保留在其它slot 中。这可用于隔离slot 。 如果所有输入算子都属于同一个slot共享组那么slot共享组从将继承输入算子所在的slot。slot共享组的默认名称是 “default”可以调用slotSharingGroup(“default”) 来显式地将算子放入该组 someStream.filter(...).slotSharingGroup(name);用户自定义函数UDF 在Flink中可以通过自定义函数UDFUser-Defined Function来实现对数据流的自定义操作。 UDF可以用于转换、过滤、聚合等操作以满足特定的业务需求。也就是说用户可以根据自身需求重新实现算子的逻辑。 用户自定义函数分为函数类、匿名函数、富函数类。 要定义自定义函数需要实现相应的函数接口具体取决于希望实现的功能。 函数类 函数类Function Classes是最通用、最灵活的一种用户定义函数的方式。可以创建一个实现特定函数接口的类并在类中实现函数的逻辑。常见的函数接口有MapFunction、FilterFunction、ReduceFunction、AggregatgeFunction等等。 public static void main(String[] args) throws Exception {class MyMapFunction implements MapFunctionInteger, String {Overridepublic String map(Integer value) {return 数字 value; // 将整数转换为字符串}}StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceInteger stream env.fromCollection(Arrays.asList(1, 4, 6, 3, 2, 5));SingleOutputStreamOperatorString map stream.map(new MyMapFunction());map.print();env.execute();}2 数字 3
1 数字 6
4 数字 5
12 数字 4
11 数字 1
3 数字 2匿名函数
匿名函数Anonymous Function是一种没有具体函数类定义的函数它只是在使用时定义。匿名函数通常用于一次性的简单操作不需要单独定义一个函数类。 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceInteger stream env.fromCollection(Arrays.asList(1, 4, 6, 3, 2, 5));SingleOutputStreamOperatorString map stream.map(new MapFunctionInteger, String() {Overridepublic String map(Integer value) {return 数字 value;}});map.print();env.execute();}在匿名函数中还可以自由地引用外部的变量 public static void main(String[] args) throws Exception {int threshold 3;StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceInteger stream env.fromCollection(Arrays.asList(1, 4, 6, 3, 2, 5));SingleOutputStreamOperatorInteger map stream.filter(new FilterFunctionInteger() {Overridepublic boolean filter(Integer value) {return value threshold; // 过滤大于阈值的值}});map.print();env.execute();}10 6
1 5
9 4富函数类 富函数类Rich Function Class是Function Class的一种特殊形式。富函数类不仅支持函数的逻辑定义还可以使用更多的生命周期和上下文信息如open()、close()和getRuntimeContext()等。 open()方法 是Rich Function的初始化方法会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前open()会首先被调用。 close()方法 生命周期中的最后一个调用的方法类似于结束方法。一般用来做一些清理工作。 public static void main(String[] args) throws Exception {class MyRichMapFunction extends RichMapFunctionInteger, String {Overridepublic String map(Integer value) {return 数字 value;}Overridepublic void open(Configuration parameters) throws Exception {// 初始化资源或状态System.out.println(并行子任务的索引 getRuntimeContext().getIndexOfThisSubtask() 生命周期开始);}Overridepublic void close() throws Exception {// 释放资源或保存状态super.close();System.out.println(并行子任务的索引 getRuntimeContext().getIndexOfThisSubtask() 生命周期结束);}}StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceInteger stream env.fromCollection(Arrays.asList(1, 2, 3));SingleOutputStreamOperatorString map stream.map(new MyRichMapFunction());map.print();env.execute();}并行子任务的索引0 生命周期开始
数字 1
数字 2
数字 3
并行子任务的索引0 生命周期结束env.setParallelism(2);并行子任务的索引0 生命周期开始
并行子任务的索引1 生命周期开始
2 数字 2
1 数字 1
1 数字 3
并行子任务的索引1 生命周期结束
并行子任务的索引0 生命周期结束