当前位置: 首页 > news >正文

jsp网站开发目的及意义建工报名网

jsp网站开发目的及意义,建工报名网,新闻宣传wordpress主题,网站开发要求有哪些文章目录 1. 源算子 Source1. 从集合读2. 从文件读取3. 从 socket 读取4. 从 kafka 读取5. 从数据生成器读取数据 2. 转换算子基本转换算子#xff08;map/ filter/ flatMap#xff09; 1. 源算子 Source Flink可以从各种来源获取数据#xff0c;然后构建DataStream进行转换… 文章目录 1. 源算子 Source1. 从集合读2. 从文件读取3. 从 socket 读取4. 从 kafka 读取5. 从数据生成器读取数据 2. 转换算子基本转换算子map/ filter/ flatMap 1. 源算子 Source Flink可以从各种来源获取数据然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源data source而读取数据的算子就是源算子source operator。所以source就是我们整个处理程序的输入端。 在Flink1.12以前旧的添加source的方式是调用执行环境的addSource()方法 DataStream stream env.addSource(…); 方法传入的参数是一个“源函数”source function需要实现SourceFunction接口。 从Flink1.12开始主要使用流批统一的新Source架构 DataStreamSource stream env.fromSource(…) Flink直接提供了很多预实现的接口此外还有很多外部连接工具也帮我们实现了对应的Source通常情况下足以应对我们的实际需求。 1. 从集合读 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 1. 从集合读 // DataStreamSourceInteger source env.fromCollection(Arrays.asList(1, 2, 3));// 2. 直接填元素DataStreamSourceInteger source env.fromElements(1, 2, 3, 4);source.print();env.execute();}2. 从文件读取 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-files/artifactIdversion${flink.version}/version/dependency public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();FileSourceString source FileSource.forRecordStreamFormat(new TextLineInputFormat(),new Path(input/world.txt)).build();env.fromSource(source, WatermarkStrategy.noWatermarks(), fileSource).print();env.execute();}3. 从 socket 读取 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString source env.socketTextStream(localhost, 7777);source.print();env.execute();}可以使用 nc -l 7777创建一个监听链接的 tcp 4. 从 kafka 读取 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version/dependencypublic static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();KafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(hadoop102:9092).setTopics(topic_1).setGroupId(atguigu).setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()) .build();DataStreamSourceString stream env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), kafka-source);stream.print(Kafka);env.execute();}5. 从数据生成器读取数据 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-datagen/artifactIdversion${flink.version}/version/dependencypublic static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataGeneratorSourceString dataGeneratorSource new DataGeneratorSource(new GeneratorFunctionLong, String() {Overridepublic String map(Long value) throws Exception {return Number: value;}}, 10, // 自动生成的数字序列RateLimiterStrategy.perSecond(10), // 限速策略每秒生成10条Types.STRING // 返回类型);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), datagenerator).print();env.execute();}2. 转换算子 数据源读入数据之后我们就可以使用各种转换算子将一个或多个DataStream转换为新的DataStream。 基本转换算子map/ filter/ flatMap map是大家非常熟悉的大数据操作算子主要用于将数据流中的数据进行转换形成新的数据流。简单来说就是一个“一一映射”消费一个元素就产出一个元素。 filter转换操作顾名思义是对数据流执行一个过滤通过一个布尔条件表达式设置过滤条件对于每一个流内元素进行判断若为true则元素正常输出若为false则元素被过滤掉。 flatMap操作又称为扁平映射主要是将数据流中的整体一般是集合类型拆分成一个一个的个体使用。 :::info 消费一个元素可以产生0到多个元素。 ::: flatMap可以认为是“扁平化”flatten和“映射”map两步操作的结合也就是先按照某种规则对数据进行打散拆分再对拆分后的元素做转换处理。
http://www.zqtcl.cn/news/352862/

相关文章:

  • 网站搜索排名优化软件flash xml网站
  • 匀贵网站建设亿级别网站开发注意
  • 怎样架设网站网站优化公司推荐
  • iis网站防盗链济宁官方网站
  • 网址查询地址查询站长之家在海南注册公司需要什么条件
  • 网站开发兼职平台网站建设需要多少钱小江网页设计
  • 最专业的网站建设收费2021没封的网站有人分享吗
  • 站酷设计网站官网入口文字设计wordpress是服务器吗
  • 律师手机网站模板天津做推广的公司
  • 西安市高新区建设规划局网站织梦小说网站模板下载地址
  • 网站开发简历 自我评价网页设计报告论文
  • 如何让网站不被收录不备案 国内网站
  • 站长之家域名买天猫店铺去哪里买
  • asp.net做的网站模板下载万网x3 wordpress
  • 设计网站设计目标天津市建设工程管理总队网站
  • 网站开始怎么做上海响应式网页建设
  • 网站备案 seo免费二维码制作网站
  • 删除网站备案网站建设湖南岚鸿建设
  • 做vlogger的网站有哪些长沙网站排名技巧
  • 媒体营销平台商品seo关键词优化
  • 芜湖先锋网站两学一做wordpress菜单顶部
  • 网站策划怎么样一级域名网站如何申请
  • 烟台高端网站开发网站开发哪个公司好
  • 广州网站定制开发方案南宁网站 制作
  • php做网站需要后台吗郑州建网站十大
  • 网站跳出率是什么意思百度服务
  • 建站 discuz开发者导航
  • 有哪些网站可以做毕业设计外贸网站发外链
  • 如何使用网站模板计算机培训班有用吗
  • 本地宁波网站建设电子商务网站建设工具都有那些