当前位置: 首页 > news >正文

郑州网站设计制作价格广东省建设厅官网证件查询

郑州网站设计制作价格,广东省建设厅官网证件查询,ti外包网站建设,长沙人才招聘网一、什么是增量聚合函数 在Flink Window中定义了窗口分配器#xff0c;我们只是知道了数据属于哪个窗口#xff0c;可以将数据收集起来了#xff1b;至于收集起来到底要做什么#xff0c;其实还完全没有头绪#xff0c;这也就是窗口函数所需要做的事情。所以在窗口分配器…一、什么是增量聚合函数 在Flink Window中定义了窗口分配器我们只是知道了数据属于哪个窗口可以将数据收集起来了至于收集起来到底要做什么其实还完全没有头绪这也就是窗口函数所需要做的事情。所以在窗口分配器之后我们还要再接上一个定义窗口如何进行计算的操作这就是所谓的“窗口函数”window functions。 窗口可以将数据收集起来最基本的处理操作当然就是基于窗口内的数据进行聚合。 我们可以每来一个数据就在之前结果上聚合一次这就是“增量聚合”。 典型的增量聚合函数有两个ReduceFunction 和 AggregateFunction。 二、归约函数ReduceFunction 源码解析 FunctionalInterface Public public interface ReduceFunctionT extends Function, Serializable {T reduce(T var1, T var2) throws Exception; }实际案例 在Flink中使用socket模拟实时的数据流DataStream通过定义一个滚动窗口窗口的大小为10s按照id分区使用reduce聚合函数实现value的累加统计 package com.flink.DataStream.WindowFunctions;import com.flink.POJOs.WaterSensor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time;public class FlinkWindowReduceFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment streamExecutionEnvironment StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);DataStreamSourceString streamSource streamExecutionEnvironment.socketTextStream(localhost, 8888);// 注意这里为什么返回的是KeyedStream(建控流/分区流)而不是DataStreamKeyedStreamWaterSensor, String keyedStream streamSource// 使用map函数将输入的string转为一个WaterSensor类.map(new MapFunctionString, WaterSensor() {Overridepublic WaterSensor map(String s) throws Exception {// 这里写的很详细如何把string转为的WaterSensor类String[] strings s.split(,);String id strings[0];Long ts Long.valueOf(strings[1]);Integer vc Integer.valueOf(strings[2]);WaterSensor waterSensor new WaterSensor();waterSensor.setId(id);waterSensor.setTs(ts);waterSensor.setVc(vc);return waterSensor;//return new WaterSensor(strings[0],Long.valueOf(strings[1]),Integer.valueOf(strings[2])}})// 按照id做keyBy分区提问KeyBy是如何实现分区的.keyBy(new KeySelectorWaterSensor, String() {// 也可以直接使用lamda表达式更简单Overridepublic String getKey(WaterSensor waterSensor) throws Exception {// getId()方法就是return的waterSensor.idreturn waterSensor.getId();}});/*** 窗口操作主要有两个部分窗口分配器Window Assigners和窗口函数WindowFunctions* .window()方法需要传入一个窗口分配器,它指明了窗口的类型* */SingleOutputStreamOperatorWaterSensor outputStreamOperator keyedStream// 设置滚动窗口的大小10秒.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))// 使用匿名函数实现增量聚合函数ReduceFunction.reduce(new ReduceFunctionWaterSensor() {Overridepublic WaterSensor reduce(WaterSensor waterSensor1, WaterSensor waterSensor2) throws Exception {System.out.println(调用reduce方法之前的结果: waterSensor1 ,现在来的数据: waterSensor2);return new WaterSensor(waterSensor1.getId(), System.currentTimeMillis(), waterSensor1.getVc() waterSensor2.getVc());}});outputStreamOperator.print();streamExecutionEnvironment.execute();} }启动Flink程序启动nc模拟输入 nc -lk 8888 # 00-10秒输入 a,11111,1 # 11-20秒输入 a,11111,2 a,22222,3 # 21-30秒输入 a,11111,4查看控制台打印结果 WaterSensor{ida, ts11111, vc1} 调用reduce方法之前的结果:WaterSensor{ida, ts11111, vc2},现在来的数据:WaterSensor{ida, ts22222, vc3} WaterSensor{ida, ts1702022598011, vc5} WaterSensor{ida, ts11111, vc4}三、聚合函数AggregateFunction 虽然ReduceFunction 可以解决大多数归约聚合的问题但是我们通过上述案例可以发现这个接口有一个限制就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。 Flink Window API 中的 aggregate 就突破了这个限制可以定义更加灵活的窗口聚合操作。这个方法需要传入一个 AggregateFunction 的实现类作为参数。AggregateFunction 可以看作是 ReduceFunction 的通用版本这里有三种类型输入类型IN、累加器类型ACC和输出类型OUT。输入类型 IN 就是输入流中元素的数据类型累加器类型 ACC 则是我们进行聚合的中间状态类型而输出类型当然就是最终计算结果的类型了。 源码解析 PublicEvolving public interface AggregateFunctionIN, ACC, OUT extends Function, Serializable {ACC createAccumulator();ACC add(IN var1, ACC var2);OUT getResult(ACC var1);ACC merge(ACC var1, ACC var2); }接口中有四个方法 1.createAccumulator() 创建一个累加器这就是为聚合创建了一个初始状态每个聚合任务只会调用一次。 2.add() 将输入的元素添加到累加器中。 3.getResult() 从累加器中提取聚合的输出结果。 4.merge() 合并两个累加器并将合并后的状态作为一个累加器返回。 所以可以看到AggregateFunction 的工作原理是首先调用 createAccumulator()为任务初始化一个状态累加器而后每来一个数据就调用一次 add()方法对数据进行聚合得到的结果保存在状态中等到了窗口需要输出时再调用 getResult()方法得到计算结果。很明显与 ReduceFunction 相同AggregateFunction 也是增量式的聚合而由于输入、中间状态、输出的类型可以不同使得应用更加灵活方便。 案例解析 package com.flink.DataStream.WindowFunctions;import com.flink.POJOs.WaterSensor; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class FlinkWindowAggregateFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment streamExecutionEnvironment StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);DataStreamSourceString streamSource streamExecutionEnvironment.socketTextStream(localhost, 8888);KeyedStreamWaterSensor, String keyedStream streamSource.map(new MapFunctionString, WaterSensor() {Overridepublic WaterSensor map(String s) throws Exception {String[] split s.split(,);return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));}}).keyBy((KeySelectorWaterSensor, String) waterSensor - waterSensor.getId());// 窗口分配器WindowedStreamWaterSensor, String, TimeWindow windowAssigner keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(20)));SingleOutputStreamOperatorString aggregate windowAssigner.aggregate(new AggregateFunctionWaterSensor, Integer, String() {Overridepublic Integer createAccumulator() {System.out.println(创建累加器);return 0;}Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println(调用add方法,value value);return accumulator value.getVc();}Overridepublic String getResult(Integer accumulator) {System.out.println(调用getResult方法);return accumulator.toString();}Overridepublic Integer merge(Integer integer, Integer acc1) {System.out.println(调用merge方法);return null;}});aggregate.print();streamExecutionEnvironment.execute();}}启动Flink程序启动nc模拟数据流输入 创建累加器 调用add方法,valueWaterSensor{ida, ts1111, vc1} 调用getResult方法 1 创建累加器 调用add方法,valueWaterSensor{ida, ts1111, vc2} 调用add方法,valueWaterSensor{ida, ts1111, vc3} 调用add方法,valueWaterSensor{ida, ts1111, vc4} 调用getResult方法 9 创建累加器 调用add方法,valueWaterSensor{ida, ts1111, vc5} 调用getResult方法 5
http://www.zqtcl.cn/news/284160/

相关文章:

  • 贵州萝岗seo整站优化鲜花店网站建设的总结
  • 下载做网站的软件建网站做站在
  • 无锡高端网站建设公司WordPress臃肿主题
  • 网站建设与运营财务预算seo下拉优化
  • 重庆铜梁网站建设价格阜城网站建设价格
  • 怎样建置换平台网站公众号开发周期
  • 朝阳建设网站什么是网络设计方案网络设计的原则有哪些
  • 长春商城网站制作二级网站建设 知乎
  • 网站建设的结论沭阳县建设局网站
  • 镇江网站制作价格网络有限公司简介
  • 海淀网站建设哪家公司好wordpress非常卡
  • 门户网站的建设意义交互设计专业就业前景
  • 那里有学做网站的2345网址导航下载官网
  • 房产证查询系统官方网站购买网站域名
  • 高端企业门户网站建设服务公司深圳企业网站怎么做
  • 页游网站如何做推广平面图设计软件有哪些
  • 自建网站有哪些wordpress 评论增加字段
  • 企业网站建设的方案书pc网站 公众号数据互通
  • 东莞设计制作网站制作做的asp网站手机号码
  • 必须做网站等级保护网站软件免费下载安装
  • 广州天河 网站建设上海招标网站
  • 云南网站建设方案专业的徐州网站开发
  • 政务服务 网站 建设方案郑州网站建设公司电话多少
  • 优化网站浏览量怎么看建设网站公司专业服务
  • php做的网站预览单产品网站建设
  • 网站文件验证上海推广网站公司
  • 如何免费申请网站外贸工艺品网站建设
  • 有名的wordpress网站网站开发企业培训
  • 中国建设银行绑定网站南宁seo如何做
  • 饮食类网站律师资格证报考条件