当前位置: 首页 > news >正文

网站制作原理asp网站做文件共享上传

网站制作原理,asp网站做文件共享上传,广东莞建建设工程有限公司,在线做app的网站使用Flink编写代码#xff0c;步骤非常固定#xff0c;大概分为以下几步#xff0c;只要牢牢抓住步骤#xff0c;基本轻松拿下#xff1a; 1. env-准备环境 2. source-加载数据 3. transformation-数据处理转换 4. sink-数据输出 5. execute-执行 DataStream API开发 //n…使用Flink编写代码步骤非常固定大概分为以下几步只要牢牢抓住步骤基本轻松拿下 1. env-准备环境 2. source-加载数据 3. transformation-数据处理转换 4. sink-数据输出 5. execute-执行 DataStream API开发 //nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/overview/ 0. 添加依赖 propertiesflink.version1.13.6/flink.version /propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_2.11/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_2.11/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge_2.11/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-blink_2.11/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-shaded-hadoop-2-uber/artifactIdversion2.7.5-10.0/version/dependencydependencygroupIdlog4j/groupIdartifactIdlog4j/artifactIdversion1.2.17/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.24/version/dependency/dependenciesbuildextensionsextensiongroupIdorg.apache.maven.wagon/groupIdartifactIdwagon-ssh/artifactIdversion2.8/version/extension/extensionspluginsplugingroupIdorg.codehaus.mojo/groupIdartifactIdwagon-maven-plugin/artifactIdversion1.0/versionconfiguration!--上传的本地jar的位置--fromFiletarget/${project.build.finalName}.jar/fromFile!--远程拷贝的地址--urlscp://root:rootbigdata01:/opt/app/url/configuration/plugin/plugins/build 编写代码 package com.bigdata.day01;import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class WordCount01 {/*** 1. env-准备环境* 2. source-加载数据* 3. transformation-数据处理转换* 4. sink-数据输出* 5. execute-执行*/public static void main(String[] args) throws Exception {// 导入常用类时要注意 不管是在本地开发运行还是在集群上运行都这么写非常方便StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 这个是 自动 根据流的性质决定是批处理还是流处理//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 批处理流 一口气把数据算出来// env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 流处理默认是这个 可以通过打印批和流的处理结果体会流和批的含义env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 获取数据 多态的写法 DataStreamSource 它是 DataStream 的子类DataStreamString dataStream01 env.fromElements(spark flink kafka, spark sqoop flink, kakfa hadoop flink);DataStreamString flatMapStream dataStream01.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String line, CollectorString collector) throws Exception {String[] arr line.split( );for (String word : arr) {// 循环遍历每一个切割完的数据放入到收集器中就可以形成一个新的DataStreamcollector.collect(word);}}});//flatMapStream.print();// Tuple2 指的是2元组DataStreamTuple2String, Integer mapStream flatMapStream.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String word) throws Exception {return Tuple2.of(word, 1); // (hello,1)}});DataStreamTuple2String, Integer sumResult mapStream.keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer tuple2) throws Exception {return tuple2.f0;}// 此处的1 指的是元组的第二个元素进行相加的意思}).sum(1);sumResult.print();// 执行env.execute();} } 查看本机的CPU的逻辑处理器的数量逻辑处理器的数量就是你的分区数量。 12 spark 13 kakfa 11 spark 11 flink 11 kafka 13 hadoop 12 sqoop 13 flink 12 flink前面的数字是分区数默认跟逻辑处理器的数量有关系。 对结果进行解释 什么是批什么是流 批处理结果前面的序号代表分区 流处理结果 也可以通过如下方式修改分区数量 env.setParallelism(2); 关于并行度的代码演示 系统以及算子都可以设置并行度或者获取并行度 package com.bigdata.day01;import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class WordCount01 {/*** 1. env-准备环境* 2. source-加载数据* 3. transformation-数据处理转换* 4. sink-数据输出* 5. execute-执行*/public static void main(String[] args) throws Exception {// 导入常用类时要注意 不管是在本地开发运行还是在集群上运行都这么写非常方便StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 这个是 自动 根据流的性质决定是批处理还是流处理//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 批处理流 一口气把数据算出来// env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 流处理默认是这个 可以通过打印批和流的处理结果体会流和批的含义env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 将任务的并行度设置为2// env.setParallelism(2);// 通过这个获取系统的并行度int parallelism env.getParallelism();System.out.println(parallelism);// 获取数据 多态的写法 DataStreamSource 它是 DataStream 的子类DataStreamString dataStream01 env.fromElements(spark flink kafka, spark sqoop flink, kakfa hadoop flink);DataStreamString flatMapStream dataStream01.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String line, CollectorString collector) throws Exception {String[] arr line.split( );for (String word : arr) {// 循环遍历每一个切割完的数据放入到收集器中就可以形成一个新的DataStreamcollector.collect(word);}}});// 每一个算子也有自己的并行度一般跟系统保持一致System.out.println(flatMap的并行度flatMapStream.getParallelism());//flatMapStream.print();// Tuple2 指的是2元组DataStreamTuple2String, Integer mapStream flatMapStream.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String word) throws Exception {return Tuple2.of(word, 1); // (hello,1)}});DataStreamTuple2String, Integer sumResult mapStream.keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer tuple2) throws Exception {return tuple2.f0;}// 此处的1 指的是元组的第二个元组进行相加的意思}).sum(1);sumResult.print();// 执行env.execute();} }打包、上传 文件夹不需要提前准备好它可以帮我创建 提交我们自己开发打包的任务 flink run -c com.bigdata.day01.WordCount01 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar 去界面中查看运行结果 因为你这个是集群运行的所以标准输出流中查看假如第一台没有去第二台查看一直点。 获取主函数参数工具类 可以通过外部传参的方式给定一个路径 以下代码可以做到假如给定路径就获取路径的数据假如没给就读取默认数据 package com.bigdata.day01;import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class WordCount02 {/*** 1. env-准备环境* 2. source-加载数据* 3. transformation-数据处理转换* 4. sink-数据输出* 5. execute-执行*/public static void main(String[] args) throws Exception {// 导入常用类时要注意 不管是在本地开发运行还是在集群上运行都这么写非常方便StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 这个是 自动 根据流的性质决定是批处理还是流处理//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 批处理流 一口气把数据算出来// env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 流处理默认是这个 可以通过打印批和流的处理结果体会流和批的含义env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 将任务的并行度设置为2// env.setParallelism(2);// 通过这个获取系统的并行度int parallelism env.getParallelism();System.out.println(parallelism);// 获取数据 多态的写法 DataStreamSource 它是 DataStream 的子类// 连着写的本质就是 因为每一个算子的返回值都是DataStream的子类所以可以这么写// 以下代码中路径是写死的能不能通过外部传参进来当然可以 agrsDataStreamString dataStream null;System.out.println(args.length);if(args.length !0){String path args[0];dataStream env.readTextFile(path);}else{dataStream env.fromElements(spark flink kafka, spark sqoop flink, kakfa hadoop flink);}dataStream.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String line, CollectorString collector) throws Exception {String[] arr line.split( );for (String word : arr) {// 循环遍历每一个切割完的数据放入到收集器中就可以形成一个新的DataStreamcollector.collect(word);}}}).map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String word) throws Exception {return Tuple2.of(word, 1); // (hello,1)}}).keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer tuple2) throws Exception {return tuple2.f0;}// 此处的1 指的是元组的第二个元组进行相加的意思}).sum(1).print();// 执行env.execute();} }flink run -c com.bigdata.day01.Demo02 FlinkDemo-1.0-SNAPSHOT.jar /home/wc.txt 这样做跟我们以前的做法还是不一样。以前的运行方式是这样的 flink run /opt/installs/flink/examples/batch/WordCount.jar --input /home/wc.txt 这个写法传递参数的时候带有--字样而我们的没有。 以上代码进行升级我想将参数前面追加一个 --input 这样怎么写 ParameterTool parameterTool ParameterTool.fromArgs(args); if(parameterTool.has(output)){path parameterTool.get(output); }在代码中的使用 ParameterTool parameterTool ParameterTool.fromArgs(args);String output ;if (parameterTool.has(output)) {output parameterTool.get(output);System.out.println(指定了输出路径使用: output);} else {output hdfs://node01:9820/wordcount/output47_;System.out.println(可以指定输出路径使用 --output ,没有指定使用默认的: output);} 升级过的代码 package com.bigdata.day01;import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class WordCount02 {/*** 1. env-准备环境* 2. source-加载数据* 3. transformation-数据处理转换* 4. sink-数据输出* 5. execute-执行*/public static void main(String[] args) throws Exception {// 导入常用类时要注意 不管是在本地开发运行还是在集群上运行都这么写非常方便StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 这个是 自动 根据流的性质决定是批处理还是流处理//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 批处理流 一口气把数据算出来// env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 流处理默认是这个 可以通过打印批和流的处理结果体会流和批的含义env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 将任务的并行度设置为2// env.setParallelism(2);// 通过这个获取系统的并行度int parallelism env.getParallelism();System.out.println(parallelism);// 获取数据 多态的写法 DataStreamSource 它是 DataStream 的子类// 连着写的本质就是 因为每一个算子的返回值都是DataStream的子类所以可以这么写// 以下代码中路径是写死的能不能通过外部传参进来当然可以 agrsDataStreamString dataStream null;System.out.println(args.length);if(args.length !0){String path ;ParameterTool parameterTool ParameterTool.fromArgs(args);if(parameterTool.has(input)){path parameterTool.get(input);}else{path args[0];}dataStream env.readTextFile(path);}else{dataStream env.fromElements(spark flink kafka, spark sqoop flink, kakfa hadoop flink);}dataStream.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String line, CollectorString collector) throws Exception {String[] arr line.split( );for (String word : arr) {// 循环遍历每一个切割完的数据放入到收集器中就可以形成一个新的DataStreamcollector.collect(word);}}}).map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String word) throws Exception {return Tuple2.of(word, 1); // (hello,1)}}).keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer tuple2) throws Exception {return tuple2.f0;}// 此处的1 指的是元组的第二个元组进行相加的意思}).sum(1).print();// 执行env.execute();} }DataStream (Lambda表达式-扩展 了解) import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;import java.util.Arrays;/*** Desc 演示Flink-DataStream-流批一体API完成批处理WordCount* 使用Java8的lambda表示完成函数式风格的WordCount*/ public class WordCount02 {public static void main(String[] args) throws Exception {//TODO 1.env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);//指定计算模式为流//env.setRuntimeMode(RuntimeExecutionMode.BATCH);//指定计算模式为批env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//自动//不设置的话默认是流模式defaultValue(RuntimeExecutionMode.STREAMING)//TODO 2.source-加载数据DataStreamString dataStream env.fromElements(flink hadoop spark, flink hadoop spark, flink hadoop, flink);//TODO 3.transformation-数据转换处理//3.1对每一行数据进行分割并压扁/*public interface FlatMapFunctionT, O extends Function, Serializable {void flatMap(T value, CollectorO out) throws Exception;}*//*DataStreamString wordsDS dataStream.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String value, CollectorString out) throws Exception {String[] words value.split( );for (String word : words) {out.collect(word);}}});*///注意:Java8的函数的语法/lambda表达式的语法: (参数)-{函数体}DataStreamString wordsDS dataStream.flatMap((String value, CollectorString out) - {String[] words value.split( );for (String word : words) {out.collect(word);}}).returns(Types.STRING);//3.2 每个单词记为单词,1/*public interface MapFunctionT, O extends Function, Serializable {O map(T value) throws Exception;}*//*DataStreamTuple2String, Integer wordAndOneDS wordsDS.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {return Tuple2.of(value, 1);}});*/DataStreamTuple2String, Integer wordAndOneDS wordsDS.map((String value) - Tuple2.of(value, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));//3.3分组//注意:DataSet中分组用groupBy,DataStream中分组用keyBy//KeyedStreamTuple2String, Integer, Tuple keyedDS wordAndOneDS.keyBy(0);/*public interface KeySelectorIN, KEY extends Function, Serializable {KEY getKey(IN value) throws Exception;}*//*KeyedStreamTuple2String, Integer, String keyedDS wordAndOneDS.keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer value) throws Exception {return value.f0;}});*/KeyedStreamTuple2String, Integer, String keyedDS wordAndOneDS.keyBy((Tuple2String, Integer value) - value.f0);//3.4聚合SingleOutputStreamOperatorTuple2String, Integer result keyedDS.sum(1);//TODO 4.sink-数据输出result.print();//TODO 5.execute-执行env.execute();} } 此处有一个大坑就是使用完lambda表达式以后需要添加一个returns(Types.STRING); 否则报错这样的话使用lambda也不是特别快了。 连着写的版本如下 package com.bigdata.day01;import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class WordCount03 {/*** 1. env-准备环境* 2. source-加载数据* 3. transformation-数据处理转换* 4. sink-数据输出* 5. execute-执行*/public static void main(String[] args) throws Exception {// 导入常用类时要注意 不管是在本地开发运行还是在集群上运行都这么写非常方便StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 这个是 自动 根据流的性质决定是批处理还是流处理//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 批处理流 一口气把数据算出来// env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 流处理默认是这个 可以通过打印批和流的处理结果体会流和批的含义//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 将任务的并行度设置为2// env.setParallelism(2);// 通过这个获取系统的并行度int parallelism env.getParallelism();System.out.println(parallelism);// 获取数据 多态的写法 DataStreamSource 它是 DataStream 的子类// 连着写的本质就是 因为每一个算子的返回值都是DataStream的子类所以可以这么写// 以下代码中路径是写死的能不能通过外部传参进来当然可以 agrsDataStreamString dataStream null;System.out.println(args.length);if(args.length !0){String path ;ParameterTool parameterTool ParameterTool.fromArgs(args);if(parameterTool.has(input)){path parameterTool.get(input);}else{path args[0];}dataStream env.readTextFile(path);}else{dataStream env.fromElements(spark flink kafka, spark sqoop flink, kakfa hadoop flink);}dataStream.flatMap((String line, CollectorString collector) - {String[] arr line.split( );for (String word : arr) {// 循环遍历每一个切割完的数据放入到收集器中就可以形成一个新的DataStreamcollector.collect(word);}}).returns(Types.STRING).map((String word)- {return Tuple2.of(word, 1); // (hello,1)}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy((Tuple2String, Integer tuple2)- {return tuple2.f0;}).sum(1).print();// 执行env.execute();} }
http://www.zqtcl.cn/news/200588/

相关文章:

  • 网站建站网站91955内蒙古建设集团招聘信息网站
  • 巴中建设厅网站电话seo是网络优化吗
  • 中国做投资的网站做网站的公司怎么推广
  • 专业的广州微网站建设移动应用开发干什么的
  • 网站运营有什么用常熟智能网站开发
  • 如何组建做网站的团队绍兴网站建设推广
  • 资讯类响应式网站模板深圳网站建设培训机构
  • 电子商务网站功能设计3d动画制作过程
  • 随机网站生成器win7asp+sql server 2008做网站
  • 金本网站建设设计江苏建筑业网
  • 校园网站建设的作用淄博网站建设网站推广优化
  • 域名过期了怎么办怎么找回网站校友录网站开发设计
  • 医疗 企业 网站建设seo网络优化是什么工作
  • e时代速递搜索引擎网站建设aso关键词搜索优化
  • 产品单页营销型网站模板龙华网站建设深圳信科
  • 建网站平台要多少钱投资公司取名字大全
  • 建设网站需要哪些设备重庆本地建站
  • 学做家常菜去那个网站专业制作网站制作
  • 合肥网站建设公网站程序如何上传
  • 潍坊网站建设招聘官方网站建设 在线磐石网络
  • 校友网站建设开一个网站的流程
  • 商业门户网站是什么意思哪家培训机构学校好
  • 青岛企业网站制作seo排名优化培训网站
  • 2018做网站还是app上海搜索seo
  • 网站建设用模板好吗罗湖网站制作费用
  • 网站图片延时加载app推广视频
  • 郑州设计师网站个人搭建网站要多少钱
  • 网站制作成品下载wordpress怎么更改样式
  • 河北省城乡和建设厅网站首页网站维护属于什么部门
  • 西安建网站公司哪家好网站导航条设计欣赏