当前位置: 首页 > news >正文

做泥网站关于加强公司网站建设的通知

做泥网站,关于加强公司网站建设的通知,做广告推广哪家好,中国人才网登录入口一、window 概述 ​ Streaming 流式计算是一种被设计用于处理无限数据集的数据处理引擎#xff0c;而无限数据集是指一种不断增长的本质上无限的数据集#xff0c;而 Flink window 是一种将无限数据切割为有限块进行处理的手段。window 是无限数据流处理的核心#xff0c; …一、window 概述 ​ Streaming 流式计算是一种被设计用于处理无限数据集的数据处理引擎而无限数据集是指一种不断增长的本质上无限的数据集而 Flink window 是一种将无限数据切割为有限块进行处理的手段。window 是无限数据流处理的核心 window 将一个无限的 stream 拆分成有限大小的 ”buckets” 桶然后可以在这些桶上做计算操作 二、window 类型 1. Time Window 时间窗口按照时间生成 Window 1.1 Tumbling Time Window 滚动时间窗口 将数据依据固定的窗口长度(时间)对数据进行切片特点时间对齐窗口长度固定没有重叠重要参数窗口长度(时间值)适用场景适合做 BI 统计等做每个时间段的聚合计算 1.2 Sliding Time Window 滑动时间窗口 滑动时间窗口由固定的窗口长度和滑动间隔组成特点时间对齐窗口长度固定可以有重叠数据最大的重叠数 窗口长度/滑动间隔重要参数窗口长度和滑动间隔(时间值)适用场景对最近一个时间段内的统计求某接口最近 5min 的失败率来决定是否要报警 1.3 Session Window 会话时间窗口 由一系列事件组合一个指定时间长度的 timeout 间隙组成类似于 web 应用的 session也就是一段时间没有接收到新数据就会生成新的窗口特点时间无对齐重要参数会话最小时间间隔 2. Count Window 计数窗口按照指定的数据条数生成一个 Window与时间无关 2.1 Tumbling Count Window 滚动计数窗口 将数据依据固定的窗口长度(计数)对数据进行切片特点计数对齐窗口长度固定没有重叠重要参数窗口长度(计数值) 2.2 Sliding Count Window 滑动计数窗口 滑动计数窗口由固定的窗口长度和滑动间隔组成特点计数对齐窗口长度固定可以有重叠数据最大的重叠数 窗口长度/滑动间隔重要参数窗口长度和滑动间隔(计数值) 三、window API 操作 1. Window 创建 1.1 非按键分区流 原始的 DataStream 调用 windowAll() 方法创建的窗口只能在一个任务task上执行相当于并行度变成了 1生产上不建议使用 AllWindowedStream stream dataStream.windowAll()1.2 按键分区流 Window 的创建推荐是 DataStream 经过 KeyBy 之后调用 window() 方法 /**通用开窗方法WindowedStreamT window()参数WindowAssignerFlink 提供的通用 WindowAssigner1.滚动窗口tumbling window2.滑动窗口sliding window3.会话窗口session window4.全局窗口global window */ public class TestWindowCreate {public static void main(String[] args) throw Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//读取文本数据/*sensorReading.txtsensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1 */DataStreamString inputStream env.readTextFile(sensorReading.txt);DataStreamSensorReading dataStream inputStream.map(new MapFunctionString, SensorReading(){Overridepublic SensorReading map(String value) throws Exception {String[] fields value.split(,);return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}});//创建窗口//1.滚动时间窗口//1.1 使用通用 window() 方法dataStream.keyBy(id).window(TumblingProcessTimeWindows.of(Time.seconds(5)));//1.2 使用 timeWindow() 方法dataStream.keyBy(id).timeWindow(Time.seconds(5));//2.滑动时间窗口//2.1 使用通用 window() 方法dataStream.keyBy(id).window(SlidingProcessTimeWindows.of(Time.seconds(6), Time.seconds(2)));//2.2 使用 timeWindow() 方法dataStream.keyBy(id).timeWindow(Time.seconds(6), Time.seconds(2));//3.会话窗口dataStream.keyBy(id).window(EventTimeSessionWindows.withGap(Time.minutes(1)));//4.计数窗口//4.1 滚动计数窗口dataStream.keyBy(id).countWindow(10L);//4.2 滑动计数窗口dataStream.keyBy(id).countWindow(10L, 2L);env.execute();} }2. Window 函数 window function 定义了要对窗口中收集的数据做的计算操作 2.1 增量聚合函数 incremental aggregation functions每条数据到来就进行计算保持一个简单的状态窗口结束时输出最终的状态。简单的 sum/max/maxBy/min/minBy 聚合函数都是增量聚合 2.1.1 ReduceFunction /**方法签名reduce(ReduceFunctionT reduce)注意ReduceFunction 的类型 T 不能改变 */ public class TestWindowFunction {public static void main(String[] args) throw Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//读取文本数据/*sensorReading.txtsensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1 */DataStreamString inputStream env.readTextFile(sensorReading.txt);DataStreamSensorReading dataStream inputStream.map(new MapFunctionString, SensorReading(){Overridepublic SensorReading map(String value) throws Exception {String[] fields value.split(,);return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}});//创建窗口并使用窗口函数dataStream.keyBy(id).timeWindow(Time.seconds(5)).reduce(new ReduceFunctionSenesorReading() {Overridepublic SenesorReading reduce(SenesorReading value1, SenesorReading value2) throws Exception {return value2;}}).print();env.execute();} }2.1.2 AggregateFunction /**方法签名aggregate(AggregateFunctionIN, ACC, OUT aggregate)AggregateFunction 的 3 个泛型1.IN输入数据类型2.ACC中间累加器的数据类型3.OUT输出数据类型AggregateFunction 接口中需要实现的 4 个方法1.createAccumulator()创建一个累加器即为聚合创建了一个初始状态每个聚合任务只会调用一次2.add()将输入的元素添加到累加器中。基于聚合状态对新来的数据进行进一步聚合的过程。方法传入两个参数当前新到的数据 value 和当前的累加器accumulator返回一个新的累加器值是对聚合状态进行更新。每条数据到来之后都会调用这个方法3.getResult()从累加器中提取聚合的输出结果。可以定义多个状态然后再基于这些聚合的状态计算出一个结果进行输出。比如计算平均值可以把 sum 和 count 作为状态放入累加器而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用4.merge()合并两个累加器并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用最常见的合并窗口Merging Window的场景就是会话窗口Session Windows */ public class TestWindowFunction {public static void main(String[] args) throw Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//读取文本数据/*sensorReading.txtsensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1 */DataStreamString inputStream env.readTextFile(sensorReading.txt);DataStreamSensorReading dataStream inputStream.map(new MapFunctionString, SensorReading(){Overridepublic SensorReading map(String value) throws Exception {String[] fields value.split(,);return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}});//创建窗口并使用窗口函数dataStream.keyBy(id).timeWindow(Time.seconds(15)).aggregate(new AggregateFunctionSenesorReading, Integer, Integer() {Overridepublic Integer createAccumulator() { return 0;}Overridepublic Integer add(SenesorReading value, Integer accumulator) {return accumulator 1;}Overridepublic Integer getResult(Integer accumulator) {return accumulator;}Overridepublic Integer merge(Integer a, Integer b) {return a b;}}).print();env.execute();} }2.2 全窗口函数 full window functions先收集窗口中的每一条数据并在内部缓存起来等到窗口要输出结果的时候再将所有数据进行计算并输出 2.2.1 WindowFunction /**方法签名apply(WindowFunctionIN, OUT, KEY, W extends Window window)泛型1.IN输入数据类型2.OUT输出数据类型3.KEY分组 key 的类型4.W窗口的类型需要实现的方法void apply(KEY key, W window, IterableIN input, CollectorOUT out)1.key分区的 key2.window当前窗口信息3.input窗口所有数据的可迭代集合4.out数据收集器 */ public class TestFullWindowFunction {public static void main(String[] args) throw Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//读取文本数据/*sensorReading.txtsensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1 */DataStreamString inputStream env.readTextFile(sensorReading.txt);DataStreamSensorReading dataStream inputStream.map(new MapFunctionString, SensorReading(){Overridepublic SensorReading map(String value) throws Exception {String[] fields value.split(,);return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}});//创建窗口并使用窗口函数dataStream.keyBy(id).timeWindow(Time.seconds(15)).apply(new WindowFunctionSenesorReading, Tuple3String, Long, Integer, Tuple, TimeWindow() {Overridepublic void apply(Tuple key, TimeWindow window, IterableSensorReading input, CollectorTuple3String, Long, Integer out) throws Exception { String id key.getField(0);Long windowEnd window.getEnd();Integer count IteratorUtils.toList(input.iterator()).size();out.collect(new Tuple3(id, windowEnd, count));}}).print();env.execute();} }2.2.2 ProcessWindowFunction /**方法签名process(ProcessWindowFunctionIN, OUT, KEY, W extends Window window)泛型1.IN输入数据类型2.OUT输出数据类型3.KEY分组 key 的类型4.W窗口的类型需要实现的方法void process(KEY key, Context context, IterableIN elements, CollectorOUT out)1.key分区的 key2.context上下文环境对象3.input窗口所有数据的可迭代集合4.out数据收集器 */ public class TestFullWindowFunction {public static void main(String[] args) throw Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//读取文本数据/*sensorReading.txtsensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1 */DataStreamString inputStream env.readTextFile(sensorReading.txt);DataStreamSensorReading dataStream inputStream.map(new MapFunctionString, SensorReading(){Overridepublic SensorReading map(String value) throws Exception {String[] fields value.split(,);return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}});//创建窗口并使用窗口函数dataStream.keyBy(id).timeWindow(Time.seconds(15)).process(new ProcessWindowFunctionSenesorReading, Tuple3String, Long, Integer, Tuple, TimeWindow() {Overridepublic void process(Tuple key, Context context, IterableSensorReading input, CollectorTuple3String, Long, Integer out) throws Exception { String id key.getField(0);Long windowEnd context.window().getEnd();Integer count IteratorUtils.toList(input.iterator()).size();out.collect(new Tuple3(id, windowEnd, count));}}).print();env.execute();} }3. 其他可选 API 3.1 trigger 触发器主要是用来控制窗口什么时候触发计算即执行窗口函数 /**参数Trigger 抽象类内置实现类EventTimeTrigger、ProcessingTimeTrigger 和 CountTrigger 等自定义实现类继承 Trigger 抽象类并重写方法1.onElement()窗口中每到来一个元素都会调用这个方法2.onEventTime()当注册的事件时间定时器触发时将调用这个方法3.onProcessingTime()当注册的处理时间定时器触发时将调用这个方法4.clear()当窗口关闭销毁时调用这个方法。一般用来清除自定义的状态 */ trigger(Trigger trigger)3.2 evictor 移除器主要用来定义移除某些数据的逻辑 /**参数Evictor 接口实现方法1.evictBefore()定义执行窗口函数之前的移除数据操作2.evictAfter()定义执行窗口函数之后的以处数据操作注意默认情况下预实现的移除器都是在执行窗口函数window fucntions之前移除数据的 */ evictor(Evictor evictor)3.3 allowedLateness 允许延迟的数据设定允许延迟一段时间在这段时间内窗口不会销毁继续到来的数据依然可以进入窗口中并触发计算更新结果。直到水位线推进到了 窗口结束时间 延迟时间才真正将窗口的内容清空正式关闭窗口 /**方法签名 */ allowedLateness(Time time)3.4 sideOutputLateData 将迟到的数据放入侧输出流可以将未收入窗口的迟到数据放入“侧输出流”side output进行另外的处理。所谓的侧输出流相当于是数据流的一个“分支”这个流中单独放置那些错过了该上的车、本该被丢弃的数据 /**参数OutputTag 输出标签用来标记分支的迟到数据流 */ sideOutputLateData(OutputTagT outputTag)//实例化方式 OutputTagString outputTag new OutputTagString(late) {};//提取侧输出流方法由执行完所有窗口函数后得到的 DataStream 调用 getSideOutput(OutputTagT outputTag)
http://www.zqtcl.cn/news/795763/

相关文章:

  • 龙岩网站定制网站开发 技术路线
  • 广州制作网站开发网站标题怎么设置
  • 海南旅游网站开发背景做网站兼容ie
  • 查找人网站 优帮云本地升级wordpress
  • 安庆什么网站好小事做wordpress主题vue
  • 高端商品网站网络运维工程师面试题及答案
  • 做网站的dw全称是啥适合迷茫年轻人的工作
  • 免费软件库合集软件资料网站wordpress go链接跳转错误
  • 重庆那里做网站外包好和镜像网站做友链
  • 网站栏目关键词装修效果图制作软件
  • 企业网站开发公司-北京公司北京医疗网站建设公司
  • 可以做配音兼职的网站产品网站怎样做外部链接
  • 如何制作网站效果图做外单要上什么网站
  • 网站开发预算编制网站可以制作ios
  • 强化网站建设网页翻译怎么弄出来
  • 长春火车站到龙嘉机场高铁时刻表视频网站建设公司排名
  • 武进网站建设代理商google官网下载
  • 简单网站开发流程图知乎怎么申请关键词推广
  • 成寿寺网站建设公司文登区做网站的公司
  • 建设一个网站用什么软件下载阿里外贸平台网站建设
  • 可信网站myeclipse网站开发
  • 做设计找素材的 网站有哪些网站建设实训个人总结
  • 浙江省建设厅继续教育官方网站网站做vr的收费
  • 建造网站 备案苏州手机网站设计
  • 做外贸卖小商品是哪个网站手机首页设计
  • 大连网站制作公司营销策划公司有哪些职位
  • 2019深圳网站设计公司排名网站设计的思想
  • 试客那个网站做的好seo管理平台
  • 增加网站关键词库网盟推广合作
  • 企业门户网站内容建设濮阳网络培训基地