广州响应式网站,网站开发属于哪个税收分类,推广普通话手抄报文字内容,博罗做网站技术摘自Apache Flink官网 最早的streaming 架构是storm的lambda架构 分为三个layer batch layerserving layerspeed layer一、在streaming中Flink支持的通知时间 Flink官网写了个了解streaming和各种时间的博客 https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-1… 摘自Apache Flink官网 最早的streaming 架构是storm的lambda架构 分为三个layer batch layerserving layerspeed layer 一、在streaming中Flink支持的通知时间 Flink官网写了个了解streaming和各种时间的博客 https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101#F2 1、Processing time执行时候的机器系统时间。 如果使用时间窗口的话如果一个应用在9:15开始那么第一次的结束时间在10:00. 然后是10:0011:00, 之后都是整点。就第一个点比较特殊 2、Event time每一个Event在其设备上产生的时间是在进入Flink之前的时间。 可以从data里面提取出来Event time的程序必须声明怎么产生Event Time Watermarks。Event time处理会发生延时因为有可能有的Event没有到达如果所有的events都到达了那么event time operations会按照预期的执行 3、Ingestion timeevents进入Flink的时间 在source算子每一个记录得到当前算子的时间基于时间的操作根据这个时间。记录时间有点开销因为是在source上但是非常可靠。因为如果是processing time的话有可能机器的local time不一样Ingestion time和event time不一样这个不能处理过期时间4、watermark在Flink中Event time程序衡量执行的是watermarks watermark携带了时间戳watermark在source function之后产生每一个并行的子任务独立的产生watermarks可以设置迟到时间来容忍迟到的watermak注册watermark的代码 1 public static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarksOrderRecord {2 private final long maxOutOfOrderness 3500; // 3.5 seconds3 4 private long currentMaxTimestamp;5 6 Override7 public long extractTimestamp(OrderRecord record, long previousElementTimestamp) { // 将数据中的时间戳字段long 类型精确到毫秒赋给 timestamp 变量此处是 OrderRecord 的 timestamp 字段8 long timestamp record.timestamp;9 currentMaxTimestamp Math.max(timestamp, currentMaxTimestamp);
10 return timestamp;
11 }
12
13 Override
14 public Watermark getCurrentWatermark() { // return the watermark as current highest timestamp minus the out-of-orderness bound
15 return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
16 }
17 } 5、Late Elements迟到元素。即使在watermarkk已经产生了之后仍然有迟到元素 设置很长的延迟时间不太实际默认上Late Elements是drop掉的Flink支持allowedLateness在被drop前可以容忍的最大延迟时间如果设置了allowedLateness当迟到元素到达的时候会再计算一遍窗口也可以设置side output将废弃的数据当成side output6、idling sources: 在一段时间内watermark没有到来窗口内的元素就不执行这就是idling sources 二、生成TimeStamps / Watermarks 1、指派timestamps 这部分通常在实例中的一些filed内进行accessing/extracting the timestamp。 2、生成timestamps 和 watermark的方法 Directly in the data source.通过watermark 和 timestamp generator3、在source下生成timestamps和watermark 需要使用collectWithTimestamp方法在SourceContext下面watermark需要使用emitWatermark如果使用了generator那么source生成的watermark和timestamp会被复写 Java Code 1 final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();2 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);3 4 DataStreamMyEvent stream env.readFile(5 myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,6 FilePathFilter.createDefaultFilter(), typeInfo);7 8 DataStreamMyEvent withTimestampsAndWatermarks stream9 .filter( event - event.severity() WARNING )
10 .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
11
12 withTimestampsAndWatermarks
13 .keyBy( (event) - event.getGroup() )
14 .timeWindow(Time.seconds(10))
15 .reduce( (a, b) - a.add(b) )
16 .addSink(...); 三、预定义的TimeStamp Extractor和Watermark Emmiter 1、最简单的watermark generator 如果并行数据是升序的那么最简单的方法是使用 AscendingTimestampExtractor。即便是kafka消息源如果每个partition的消息是升序的那么在shuffle阶段会把每个partition的watermark正确的进行shuffle。 1 DataStreamMyEvent stream ...2 3 DataStreamMyEvent withTimestampsAndWatermarks 4 stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractorMyEvent() {5 6 Override7 public long extractAscendingTimestamp(MyEvent element) {8 return element.getCreationTime();9 }
10 }); 2、允许延迟的watermark 可以设定固定的延时时间延迟迟到时间戳 - 上一个元素的watermark。如果延迟 lateness会被忽略。 1 DataStreamMyEvent stream ...2 3 DataStreamMyEvent withTimestampsAndWatermarks 4 stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorMyEvent(Time.seconds(10)) {5 6 Override7 public long extractTimestamp(MyEvent element) {8 return element.getCreationTime();9 }
10 }); 转载于:https://www.cnblogs.com/ylxn/p/10617357.html