一些做的好的网站,网站模板设计报价单,wordpress 新浪微博,wordpress收购link 在开窗处理事件时间(Event Time) 数据时#xff0c;可设置水印延迟以及设置窗口允许延迟(allowedLateness)以保证数据的完整性。这两者因都是设置延迟时间所以刚接触时容易混淆。本文接下将展开讨论分析“水印延迟”与“窗口允许延迟”概念及区别。水印延迟(WaterMark)(1…link 在开窗处理事件时间(Event Time) 数据时可设置水印延迟以及设置窗口允许延迟(allowedLateness)以保证数据的完整性。这两者因都是设置延迟时间所以刚接触时容易混淆。本文接下将展开讨论分析“水印延迟”与“窗口允许延迟”概念及区别。水印延迟(WaterMark)(1) 水印由于采用了事件时间脱离了物理挂钟。窗口不知道什么时候需要关闭并进行计算这个时候需要借助水印来解决该问题。当窗口遇到水位标识时就默认是窗口时间段内的数据都到齐了可以触发窗口计算。(2) 水印延迟设置水印延迟时间的目的是让水印延迟到达从而可以解决乱序问题。通过水印延迟到达让在延迟时间范围内到达的迟到数据可以加入到窗口计算中保证了数据的完整性。当水印到达后就会触发窗口计算在水印之后到达的迟到数据则会被丢弃。窗口允许延迟(allowedLateness)使用 StreamAPI 时在进行开窗后可设置 allowedLateness 窗口延迟。官网中对其解释如下默认情况下当水印到达窗口末端时迟到元素将会被删除。但Flink允许为window operators指定允许的最大延迟。允许延迟指定元素在被删除之前延迟的时间默认值为0。当元素在水印经过窗口末端后到达且它的到达时间在窗口末端加上运行延迟的时间之内其仍会被添加到窗口中。根据所使用的触发器延迟但未被丢弃的元素可能会再次触发窗口计算。EventTimeTrigger就是这种情况。为了做到这一点Flink保持窗口的状态直到它们允许的延迟到期。一旦发生这种情况Flink将删除窗口并删除其状态正如窗口生命周期部分中所描述的那样。简单理解通常在水印到达之后迟到数据将会被删除而窗口的延迟则是指数据在被删除之前的允许保留时间。也就是说在水印达到之后迟到数据本该被删除但是如果设置了窗口延迟那么在水印之后到窗口延迟时间段内到达的迟到数据还是会被加入到窗口计算中并再次触发窗口计算。一个Demo 两个猜想下面我用一个 Demo 和两个猜想来帮助大家加深理解这两个概念。例子接收 Kafka 数据数据为 JSON 格式如{word:a,count:1,time:1604286564}。我们开一个 5 秒的 tumbling windows 滚动窗口以 word 作为 key 在窗口内对 count 值进行累加。同时设置水印延迟 2 秒窗口延迟 2 秒。代码如下public class MyExample {public static void main(String[] args) throws Exception {// 创建环境StreamExecutionEnvironment envStreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 设置时间特性为env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 水印策略其需要注入Timestamp Assigner(描述了如何访问事件时间戳)和 Watermark Generator (事件流显示的超出正常范围的程度)WatermarkStrategywatermarkStrategyWatermarkStrategy// forBoundedOutOfOrderness 属于(periodic周期性)周期生成器通常通过onEvent()观察传入的事件然后在框架调用onPeriodicEmit()时发出水印。.forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner() {Overridepublic long extractTimestamp(WC wc, long l) {return wc.getEventTime() * 1000;}});// Kafka 配置Properties propertiesnewProperties();properties.setProperty(bootstrap.servers, Kafka地址:9092);properties.setProperty(group.id, test);// Flink 需要知道如何转换Kafka消息为Java对象(反序列化)默认提供了 KafkaDeserializationSchema(序列化需要自己编写)、JsonDeserializationSchema、AvroDeserializationSchema、TypeInformationSerializationSchemaenv.addSource(new FlinkKafkaConsumer(flinktest1, new JSONKeyValueDeserializationSchema(true), properties).setStartFromLatest())// map 构建 WC 对象.map(new MapFunction() {Overridepublic WC map(ObjectNode jsonNode) throws Exception {JsonNode valueNodejsonNode.get(value);WC wcnewWC(valueNode.get(word).asText(),valueNode.get(count).asInt(),valueNode.get(time).asLong());return wc;}})// 设定水印策略.assignTimestampsAndWatermarks(watermarkStrategy).keyBy(WC::getWord)// 窗口设置这里设置为滚动窗口.window(TumblingEventTimeWindows.of(Time.seconds(5)))// 设置窗口延迟.allowedLateness(Time.seconds(2)).reduce(new ReduceFunction() {Overridepublic WC reduce(WC wc, WC t1) throws Exception {return new WC(wc.getWord(), wc.getCount() t1.getCount());}}).print();env.execute();}static class WC {public String word;public int count;public long eventTime;public long getEventTime() {return eventTime;}public void setEventTime(long eventTime) {this.eventTime eventTime;}public String getWord() {return word;}public void setWord(String word) {this.word word;}public int getCount() {return count;}public void setCount(int count) {this.count count;}public WC(String word, int count) {this.word word;this.count count;}public WC(String word, int count,long eventTime) {this.word word;this.count count;this.eventTime eventTime;}Overridepublic String toString() {return WC{ word word \ , count count };}}}猜想1水印延迟 2s 达到所以会在第 5 2 7s 时认为 [ 0 ,5 ) 窗口的数据全部到齐并触发窗口计算。// 往 Kafka 中写入数据{word:a,count:1,time:1604286560} //2020-11-02 11:09:20{word:a,count:1,time:1604286561} //2020-11-02 11:09:21{word:a,count:1,time:1604286562} //2020-11-02 11:09:22{word:a,count:1,time:1604286566} //2020-11-02 11:09:26{word:a,count:1,time:1604286567} //2020-11-02 11:09:27 (触发了窗口计算)控制台输出分析通过测试发现最后在第 7s 也就是 11:09:27 时触发了窗口计算这符合了我们的猜想一。水印延迟 2s 达到所以会在第 5 2 7s 时认为 [ 0 ,5 ) 窗口的数据全部到齐并触发窗口计算。计算结果为3这是因为只有最前面的3条数据属于 [0,5) 窗口计算范围之内。猜想2设置了窗口延迟2秒那么只要在水印之后到窗口允许延迟的时间范围内达到且属于 [ 0,5) 窗口的迟到数据会被加入到窗口中且再次触发窗口运算// 继续往 Kafka 中写入数据{word:a,count:1,time:1604286568} //2020-11-02 11:09:28 时间到达了第 8 秒{word:a,count:1,time:1604286563} //2020-11-02 11:09:23 模拟一个在水印之后、在窗口允许延迟范围内、且属于[0,5) 窗口的迟到数据该数据还是会触发并参与到[0,5) 窗口的计算控制台输出新增了一行// 我们再继续往 Kafka 中写入数据{word:a,count:1,time:1604286569} //2020-11-02 11:09:29 时间到达第9秒{word:a,count:1,time:1604286563} //2020-11-02 11:09:23 模拟一个在水印之后且超出窗口允许延迟范围、且属于[0,5) 窗口的迟到数据该数据不会参与和触发[0,5)窗口计算查看控制台并没有发现新的输出打印。解析水印因延迟在第 7s 到达之后会触发[0,5) 窗口计算如果没有设置窗口延迟的情况下水印之后迟到且属于 [0,5) 窗口的数据会被丢弃。上面我们实验设置窗口延迟 2s实现的效果就是在水印之后窗口允许延迟时间之内(7 2 9s 之间)迟到且属于 [0,5) 窗口的数据还是会触发一次窗口计算并参与到窗口计算中。而在 9s 之后也就是超过窗口允许延时时间那么迟到且属于[0,5)的数据就会被丢弃。总结WaterMark 到达之前窗口在攒数据不会触发计算。WaterMark 等于 windowEndTime 时第一次触发窗口计算。WaterMark 到达之后allowlateness之前如果来了数据每条数据都会触发窗口计算。超过了allowlateness之后到达的迟到数据会丢弃。水印用于解决乱序问题保证数据的完整性。而之所以有allowlateness的出现是因为如果WaterMark 加大会导致窗口计算延迟。WaterMark 设定的时间是第一次触发窗口计算的时间。allowlateness 表示WaterMark 触发窗口计算以后还可以再等多久的迟到数据每次符合条件的数据到达都会再次触发一次窗口计算。allowlateness 是在 Watermark 基础上再做了一层迟到数据的保证。【责任编辑赵宁宁 TEL(010)68476606】点赞 0