网站制作原理,asp网站做文件共享上传,广东莞建建设工程有限公司,在线做app的网站使用Flink编写代码#xff0c;步骤非常固定#xff0c;大概分为以下几步#xff0c;只要牢牢抓住步骤#xff0c;基本轻松拿下#xff1a; 1. env-准备环境 2. source-加载数据 3. transformation-数据处理转换 4. sink-数据输出 5. execute-执行 DataStream API开发
//n…使用Flink编写代码步骤非常固定大概分为以下几步只要牢牢抓住步骤基本轻松拿下 1. env-准备环境 2. source-加载数据 3. transformation-数据处理转换 4. sink-数据输出 5. execute-执行 DataStream API开发
//nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/overview/
0. 添加依赖
propertiesflink.version1.13.6/flink.version
/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_2.11/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_2.11/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge_2.11/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-blink_2.11/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-shaded-hadoop-2-uber/artifactIdversion2.7.5-10.0/version/dependencydependencygroupIdlog4j/groupIdartifactIdlog4j/artifactIdversion1.2.17/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.24/version/dependency/dependenciesbuildextensionsextensiongroupIdorg.apache.maven.wagon/groupIdartifactIdwagon-ssh/artifactIdversion2.8/version/extension/extensionspluginsplugingroupIdorg.codehaus.mojo/groupIdartifactIdwagon-maven-plugin/artifactIdversion1.0/versionconfiguration!--上传的本地jar的位置--fromFiletarget/${project.build.finalName}.jar/fromFile!--远程拷贝的地址--urlscp://root:rootbigdata01:/opt/app/url/configuration/plugin/plugins/build 编写代码
package com.bigdata.day01;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount01 {/*** 1. env-准备环境* 2. source-加载数据* 3. transformation-数据处理转换* 4. sink-数据输出* 5. execute-执行*/public static void main(String[] args) throws Exception {// 导入常用类时要注意 不管是在本地开发运行还是在集群上运行都这么写非常方便StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 这个是 自动 根据流的性质决定是批处理还是流处理//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 批处理流 一口气把数据算出来// env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 流处理默认是这个 可以通过打印批和流的处理结果体会流和批的含义env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 获取数据 多态的写法 DataStreamSource 它是 DataStream 的子类DataStreamString dataStream01 env.fromElements(spark flink kafka, spark sqoop flink, kakfa hadoop flink);DataStreamString flatMapStream dataStream01.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String line, CollectorString collector) throws Exception {String[] arr line.split( );for (String word : arr) {// 循环遍历每一个切割完的数据放入到收集器中就可以形成一个新的DataStreamcollector.collect(word);}}});//flatMapStream.print();// Tuple2 指的是2元组DataStreamTuple2String, Integer mapStream flatMapStream.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String word) throws Exception {return Tuple2.of(word, 1); // (hello,1)}});DataStreamTuple2String, Integer sumResult mapStream.keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer tuple2) throws Exception {return tuple2.f0;}// 此处的1 指的是元组的第二个元素进行相加的意思}).sum(1);sumResult.print();// 执行env.execute();}
}
查看本机的CPU的逻辑处理器的数量逻辑处理器的数量就是你的分区数量。 12 spark
13 kakfa
11 spark
11 flink
11 kafka
13 hadoop
12 sqoop
13 flink
12 flink前面的数字是分区数默认跟逻辑处理器的数量有关系。
对结果进行解释
什么是批什么是流 批处理结果前面的序号代表分区 流处理结果 也可以通过如下方式修改分区数量 env.setParallelism(2);
关于并行度的代码演示
系统以及算子都可以设置并行度或者获取并行度
package com.bigdata.day01;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount01 {/*** 1. env-准备环境* 2. source-加载数据* 3. transformation-数据处理转换* 4. sink-数据输出* 5. execute-执行*/public static void main(String[] args) throws Exception {// 导入常用类时要注意 不管是在本地开发运行还是在集群上运行都这么写非常方便StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 这个是 自动 根据流的性质决定是批处理还是流处理//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 批处理流 一口气把数据算出来// env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 流处理默认是这个 可以通过打印批和流的处理结果体会流和批的含义env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 将任务的并行度设置为2// env.setParallelism(2);// 通过这个获取系统的并行度int parallelism env.getParallelism();System.out.println(parallelism);// 获取数据 多态的写法 DataStreamSource 它是 DataStream 的子类DataStreamString dataStream01 env.fromElements(spark flink kafka, spark sqoop flink, kakfa hadoop flink);DataStreamString flatMapStream dataStream01.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String line, CollectorString collector) throws Exception {String[] arr line.split( );for (String word : arr) {// 循环遍历每一个切割完的数据放入到收集器中就可以形成一个新的DataStreamcollector.collect(word);}}});// 每一个算子也有自己的并行度一般跟系统保持一致System.out.println(flatMap的并行度flatMapStream.getParallelism());//flatMapStream.print();// Tuple2 指的是2元组DataStreamTuple2String, Integer mapStream flatMapStream.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String word) throws Exception {return Tuple2.of(word, 1); // (hello,1)}});DataStreamTuple2String, Integer sumResult mapStream.keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer tuple2) throws Exception {return tuple2.f0;}// 此处的1 指的是元组的第二个元组进行相加的意思}).sum(1);sumResult.print();// 执行env.execute();}
}打包、上传 文件夹不需要提前准备好它可以帮我创建 提交我们自己开发打包的任务
flink run -c com.bigdata.day01.WordCount01 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar 去界面中查看运行结果
因为你这个是集群运行的所以标准输出流中查看假如第一台没有去第二台查看一直点。 获取主函数参数工具类
可以通过外部传参的方式给定一个路径 以下代码可以做到假如给定路径就获取路径的数据假如没给就读取默认数据
package com.bigdata.day01;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount02 {/*** 1. env-准备环境* 2. source-加载数据* 3. transformation-数据处理转换* 4. sink-数据输出* 5. execute-执行*/public static void main(String[] args) throws Exception {// 导入常用类时要注意 不管是在本地开发运行还是在集群上运行都这么写非常方便StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 这个是 自动 根据流的性质决定是批处理还是流处理//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 批处理流 一口气把数据算出来// env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 流处理默认是这个 可以通过打印批和流的处理结果体会流和批的含义env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 将任务的并行度设置为2// env.setParallelism(2);// 通过这个获取系统的并行度int parallelism env.getParallelism();System.out.println(parallelism);// 获取数据 多态的写法 DataStreamSource 它是 DataStream 的子类// 连着写的本质就是 因为每一个算子的返回值都是DataStream的子类所以可以这么写// 以下代码中路径是写死的能不能通过外部传参进来当然可以 agrsDataStreamString dataStream null;System.out.println(args.length);if(args.length !0){String path args[0];dataStream env.readTextFile(path);}else{dataStream env.fromElements(spark flink kafka, spark sqoop flink, kakfa hadoop flink);}dataStream.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String line, CollectorString collector) throws Exception {String[] arr line.split( );for (String word : arr) {// 循环遍历每一个切割完的数据放入到收集器中就可以形成一个新的DataStreamcollector.collect(word);}}}).map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String word) throws Exception {return Tuple2.of(word, 1); // (hello,1)}}).keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer tuple2) throws Exception {return tuple2.f0;}// 此处的1 指的是元组的第二个元组进行相加的意思}).sum(1).print();// 执行env.execute();}
}flink run -c com.bigdata.day01.Demo02 FlinkDemo-1.0-SNAPSHOT.jar /home/wc.txt
这样做跟我们以前的做法还是不一样。以前的运行方式是这样的
flink run /opt/installs/flink/examples/batch/WordCount.jar --input /home/wc.txt
这个写法传递参数的时候带有--字样而我们的没有。
以上代码进行升级我想将参数前面追加一个 --input 这样怎么写
ParameterTool parameterTool ParameterTool.fromArgs(args);
if(parameterTool.has(output)){path parameterTool.get(output);
}在代码中的使用
ParameterTool parameterTool ParameterTool.fromArgs(args);String output ;if (parameterTool.has(output)) {output parameterTool.get(output);System.out.println(指定了输出路径使用: output);} else {output hdfs://node01:9820/wordcount/output47_;System.out.println(可以指定输出路径使用 --output ,没有指定使用默认的: output);} 升级过的代码
package com.bigdata.day01;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount02 {/*** 1. env-准备环境* 2. source-加载数据* 3. transformation-数据处理转换* 4. sink-数据输出* 5. execute-执行*/public static void main(String[] args) throws Exception {// 导入常用类时要注意 不管是在本地开发运行还是在集群上运行都这么写非常方便StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 这个是 自动 根据流的性质决定是批处理还是流处理//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 批处理流 一口气把数据算出来// env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 流处理默认是这个 可以通过打印批和流的处理结果体会流和批的含义env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 将任务的并行度设置为2// env.setParallelism(2);// 通过这个获取系统的并行度int parallelism env.getParallelism();System.out.println(parallelism);// 获取数据 多态的写法 DataStreamSource 它是 DataStream 的子类// 连着写的本质就是 因为每一个算子的返回值都是DataStream的子类所以可以这么写// 以下代码中路径是写死的能不能通过外部传参进来当然可以 agrsDataStreamString dataStream null;System.out.println(args.length);if(args.length !0){String path ;ParameterTool parameterTool ParameterTool.fromArgs(args);if(parameterTool.has(input)){path parameterTool.get(input);}else{path args[0];}dataStream env.readTextFile(path);}else{dataStream env.fromElements(spark flink kafka, spark sqoop flink, kakfa hadoop flink);}dataStream.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String line, CollectorString collector) throws Exception {String[] arr line.split( );for (String word : arr) {// 循环遍历每一个切割完的数据放入到收集器中就可以形成一个新的DataStreamcollector.collect(word);}}}).map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String word) throws Exception {return Tuple2.of(word, 1); // (hello,1)}}).keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer tuple2) throws Exception {return tuple2.f0;}// 此处的1 指的是元组的第二个元组进行相加的意思}).sum(1).print();// 执行env.execute();}
}DataStream (Lambda表达式-扩展 了解)
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.util.Arrays;/*** Desc 演示Flink-DataStream-流批一体API完成批处理WordCount* 使用Java8的lambda表示完成函数式风格的WordCount*/
public class WordCount02 {public static void main(String[] args) throws Exception {//TODO 1.env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);//指定计算模式为流//env.setRuntimeMode(RuntimeExecutionMode.BATCH);//指定计算模式为批env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//自动//不设置的话默认是流模式defaultValue(RuntimeExecutionMode.STREAMING)//TODO 2.source-加载数据DataStreamString dataStream env.fromElements(flink hadoop spark, flink hadoop spark, flink hadoop, flink);//TODO 3.transformation-数据转换处理//3.1对每一行数据进行分割并压扁/*public interface FlatMapFunctionT, O extends Function, Serializable {void flatMap(T value, CollectorO out) throws Exception;}*//*DataStreamString wordsDS dataStream.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String value, CollectorString out) throws Exception {String[] words value.split( );for (String word : words) {out.collect(word);}}});*///注意:Java8的函数的语法/lambda表达式的语法: (参数)-{函数体}DataStreamString wordsDS dataStream.flatMap((String value, CollectorString out) - {String[] words value.split( );for (String word : words) {out.collect(word);}}).returns(Types.STRING);//3.2 每个单词记为单词,1/*public interface MapFunctionT, O extends Function, Serializable {O map(T value) throws Exception;}*//*DataStreamTuple2String, Integer wordAndOneDS wordsDS.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {return Tuple2.of(value, 1);}});*/DataStreamTuple2String, Integer wordAndOneDS wordsDS.map((String value) - Tuple2.of(value, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));//3.3分组//注意:DataSet中分组用groupBy,DataStream中分组用keyBy//KeyedStreamTuple2String, Integer, Tuple keyedDS wordAndOneDS.keyBy(0);/*public interface KeySelectorIN, KEY extends Function, Serializable {KEY getKey(IN value) throws Exception;}*//*KeyedStreamTuple2String, Integer, String keyedDS wordAndOneDS.keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer value) throws Exception {return value.f0;}});*/KeyedStreamTuple2String, Integer, String keyedDS wordAndOneDS.keyBy((Tuple2String, Integer value) - value.f0);//3.4聚合SingleOutputStreamOperatorTuple2String, Integer result keyedDS.sum(1);//TODO 4.sink-数据输出result.print();//TODO 5.execute-执行env.execute();}
}
此处有一个大坑就是使用完lambda表达式以后需要添加一个returns(Types.STRING); 否则报错这样的话使用lambda也不是特别快了。
连着写的版本如下
package com.bigdata.day01;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount03 {/*** 1. env-准备环境* 2. source-加载数据* 3. transformation-数据处理转换* 4. sink-数据输出* 5. execute-执行*/public static void main(String[] args) throws Exception {// 导入常用类时要注意 不管是在本地开发运行还是在集群上运行都这么写非常方便StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 这个是 自动 根据流的性质决定是批处理还是流处理//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 批处理流 一口气把数据算出来// env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 流处理默认是这个 可以通过打印批和流的处理结果体会流和批的含义//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 将任务的并行度设置为2// env.setParallelism(2);// 通过这个获取系统的并行度int parallelism env.getParallelism();System.out.println(parallelism);// 获取数据 多态的写法 DataStreamSource 它是 DataStream 的子类// 连着写的本质就是 因为每一个算子的返回值都是DataStream的子类所以可以这么写// 以下代码中路径是写死的能不能通过外部传参进来当然可以 agrsDataStreamString dataStream null;System.out.println(args.length);if(args.length !0){String path ;ParameterTool parameterTool ParameterTool.fromArgs(args);if(parameterTool.has(input)){path parameterTool.get(input);}else{path args[0];}dataStream env.readTextFile(path);}else{dataStream env.fromElements(spark flink kafka, spark sqoop flink, kakfa hadoop flink);}dataStream.flatMap((String line, CollectorString collector) - {String[] arr line.split( );for (String word : arr) {// 循环遍历每一个切割完的数据放入到收集器中就可以形成一个新的DataStreamcollector.collect(word);}}).returns(Types.STRING).map((String word)- {return Tuple2.of(word, 1); // (hello,1)}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy((Tuple2String, Integer tuple2)- {return tuple2.f0;}).sum(1).print();// 执行env.execute();}
}