网站模板打包下载,网站与手机app是一体吗,荣县网站建设,wordpress主题king1.基本转换算子 基本转换算子说明映射#xff08;map#xff09;将数据流中的数据进行转换#xff0c;形成新的数据流过滤#xff08;filter#xff09;将数据流中的数据根据条件过滤扁平映射#xff08;flatMap#xff09;将数据流中的整体#xff08;如#xff1a;集…1.基本转换算子 基本转换算子说明映射map将数据流中的数据进行转换形成新的数据流过滤filter将数据流中的数据根据条件过滤扁平映射flatMap将数据流中的整体如集合拆分成个体使用。消费一个元素产生0到多个元素 package com.qiyu.Transformation;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** author MR.Liu* version 1.0* data 2023-10-19 11:00*/
public class Trans {/**** 映射 map 算子* param env*/public static void map(StreamExecutionEnvironment env){DataStreamInteger stream env.fromElements(1, 2, 3, 4, 5);//将集合中的元素值都 加上 100DataStreamInteger map stream.map(new MapFunctionInteger, Integer() {Overridepublic Integer map(Integer integer) throws Exception {return integer100;}});map.print();}/**** 过滤 filter 算子* param env*/public static void filter(StreamExecutionEnvironment env){DataStreamInteger stream env.fromElements(1, 2, 3, 4, 5);//将集合中的值取模不等于1的通行反之过滤DataStreamInteger filter stream.filter(new FilterFunctionInteger() {Overridepublic boolean filter(Integer integer) throws Exception {if (integer % 2 ! 1) {return true;}return false;}});filter.print();}/**** 扁平化 flatMap 算子* param env*/public static void flatMap(StreamExecutionEnvironment env){DataStreamString stream env.fromElements(Flink is a powerful framework for stream and batch processing,It provides support for event time processing);//将字符串以空格分隔拆成多个字符串个体stream.flatMap(new FlatMapFunctionString, Object() {Overridepublic void flatMap(String s, CollectorObject collector) throws Exception {String[] words s.split( );for (String word : words){collector.collect(word);}}}).print();}/*** 主程序类* param args* throws Exception*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//map(env);//filter(env);flatMap(env);env.execute();}
}2.聚合算子 聚合算子说明按键分区keyBy通过指定键key将一条流逻辑上划分为不同的分区。分区指的是并行任务的子任务对应着任务槽task solt简单聚合 sum():在输入流上对指定的字段做叠加求和的操作。 min()在输入流上对指定的字段求最小值。 max()在输入流上对指定的字段求最大值。 minBy():在输入流上针对指定字段求最小值。 maxBy():在输入流上针对指定字段求最大值。 归约聚合reduce可以把每一个新输入的数据和当前已经归约出来的值做聚合计算 package com.qiyu.Transformation;import com.qiyu.Source.Student;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;/*** author MR.Liu* version 1.0* data 2023-10-19 14:45*/
public class Aggregation {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceTuple2String, Integer stream env.fromElements(Tuple2.of(a, 1),Tuple2.of(a, 3),Tuple2.of(b, 3),Tuple2.of(b, 4));stream.keyBy(r - r.f0).print();stream.keyBy(r - r.f0).sum(1).print();stream.keyBy(r - r.f0).min(1).print();stream.keyBy(r - r.f0).max(1).print();stream.keyBy(r - r.f0).maxBy(1).print();stream.keyBy(r - r.f0).minBy(1).print();stream.keyBy(r - r.f0).reduce(new ReduceFunctionTuple2String, Integer() {Overridepublic Tuple2String, Integer reduce(Tuple2String, Integer t1, Tuple2String, Integer t2) throws Exception {return Tuple2.of(t1.f0, t1.f1 t2.f1);}}).print();env.execute();}
}