青岛seo建站,成都食品网站开发,三维网站是怎么做的,企业网站建设及运营现状分析1
#x1f3b0;#x1f3b2;#x1f579;️ #x1f3b0;时间、窗口
#x1f3b2;窗口
#x1f579;️是啥
Flink 是一种流式计算引擎#xff0c;主要是来处理无界数据流的#xff0c;数据源源不断、无穷无尽。想要更加方便高效地处理无界流#xff0c;一种方式就…
1
️ 时间、窗口
窗口
️是啥
Flink 是一种流式计算引擎主要是来处理无界数据流的数据源源不断、无穷无尽。想要更加方便高效地处理无界流一种方式就是将无限数据切割成有限的“数据块”进行处理这就是所谓的“窗口”Window。
在Flink中窗口其实并不是一个“框”应该把窗口理解成一个“桶”。在Flink中窗口可以把流切割成有限大小的多个“存储桶”bucket)每个数据都会分发到对应的桶中当到达窗口结束时间时就对每个桶中收集的数据进行计算处理。 Flink 中窗口并不是静态准备好的而是动态创建——当有落在这个窗口区间范围的数据达到时才创建对应的窗口。另外这里我们认为到达窗口结束时间时窗口就触发计算并关闭 事实上“触发计算”和“窗口关闭”两个行为也可以分开 ️分类
窗口本身是截取有界数据的一种方式所以窗口一个非常重要的信息其实就是“怎样截取数据”。换句话说就是以什么标准来开始和结束数据的截取我们把它叫作窗口的“驱动类型”。 1时间窗口Time Window 时间窗口以时间点来定义窗口的开始start和结束end所以截取出的就是某一时间段的数据。到达结束时间时窗口不再收集数据触发计算输出结果并将窗口关闭销毁。所以可以说基本思路就是“定点发车”。 2计数窗口Count Window 计数窗口基于元素的个数来截取数据到达固定的个数时就触发计算并关闭窗口。每个窗口截取数据的个数就是窗口的大小。基本思路是“人齐发车”。 根据分配数据的规则窗口的具体实现可以分为 4 类滚动窗口Tumbling Window、滑动窗口Sliding Window、会话窗口Session Window以及全局窗口Global Window 滚动窗口有固定的大小是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠也不会有间隔是“首尾相接”的状态。这是最简单的窗口形式每个数据都会被分配到一个窗口而且只会属于一个窗口。比如我们可以定义一个长度为1小时的滚动时间窗口那么每个小时就会进行一次统计或者定义一个长度为10的滚动计数窗口就会每10个数进行一次统计。 滚动窗口也可以看作是一种特殊的滑动窗口——窗口大小等于滑动步长size slide。 滑动窗口的大小也是固定的。但是窗口之间并不是首尾相接的而是可以“错开”一定的位置。定义滑动窗口的参数有两个除去窗口大小window size之外还有一个“滑动步长”window slide它其实就代表了窗口计算的频率。窗口在结束时间触发计算输出结果那么滑动步长就代表了计算频率。 会话窗口是基于“会话”session来来对数据进行分组的。会话窗口只能基于时间来定义。会话窗口中最重要的参数就是会话的超时时间也就是两个会话窗口之间的最大距离。如果相邻两个数据到来的时间间隔Gap小于指定的大小size那说明还在保持会话它们就属于同一个窗口如果gap大于size那么新来的数据就应该属于新的会话窗口而前一个窗口就应该关闭了。 “全局窗口”这种窗口全局有效会把相同key的所有数据都分配到同一个窗口中。这种窗口没有结束的时候默认是不会做触发计算的。如果希望它能对数据进行计算处理还需要自定义“触发器”Trigger。 ️api
在定义窗口操作之前首先需要确定到底是基于按键分区Keyed的数据流KeyedStream 来开窗还是直接在没有按键分区的 DataStream 上开窗。也就是说在调用窗口算子之前是否有 keyBy 操作。
1按键分区窗口Keyed Windows 经过按键分区 keyBy 操作后数据流会按照key 被分为多条逻辑流logical streams这就是KeyedStream。基于KeyedStream 进行窗口操作时窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务而窗口操作会基于每个 key 进行单独的处理。所以可以认为每个 key 上都定义了一组窗口各自独立地进行统计计算。 stream.keyBy(...) .window(...) 2非按键分区Non-Keyed Windows 如果没有进行 keyBy那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务task上执行就相当于并行度变成了1。 stream.windowAll(...) 注意对于非按键分区的窗口操作手动调大窗口算子的并行度也是无效的windowAll 本身就是一个非并行的操作。 stream.keyBy(key selector) .window(window assigner) .aggregate(window function) 其中.window()方法需要传入一个窗口分配器它指明了窗口的类型而后面的.aggregate()方法传入一个窗口函数作为参数它用来定义窗口具体的处理逻辑。窗口分配器有各种形式而窗口函数的调用方法也不只.aggregate()一种
️窗口分配器 // 窗口分配器 指定用什么窗口 时间、计数————滚动、滑动、会话KS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));KS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));KS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));KS.window(GlobalWindows.create());KS.countWindow(5); // 窗口数据长度5KS.countWindow(5, 2); // 滑动 ️窗口函数
定义了窗口分配器我们只是知道了数据属于哪个窗口可以将数据收集起来了至于收集起来到底要做什么 其实还完全没有头绪。所以在窗口分配器之后必须再接上一个定义窗口如何进行计算的操作这就是所谓的“窗口函数”window functions。 增量聚合Reduce public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString stream env.socketTextStream(localhost, 7777);KeyedStreamWaterSensor, String KS stream.map(new MapFunctionString, WaterSensor() {Overridepublic WaterSensor map(String s) throws Exception {String[] list s.split(,);return new WaterSensor(list[0], Long.valueOf(list[1]), Integer.valueOf(list[2]));}}).keyBy(value - value.id);// 窗口分配器 指定用什么窗口 时间、计数————滚动、滑动、会话WindowedStreamWaterSensor, String, TimeWindow window KS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));window.reduce(new ReduceFunctionWaterSensor() {Overridepublic WaterSensor reduce(WaterSensor t1, WaterSensor t2) throws Exception {System.out.println(reduce, t1: t2 t1: t2);return new WaterSensor(t1.getId(), t1.getTs(), t1.getVc()t2.getVc());}}).print();env.execute();}
Aggregate
ReduceFunction 可以解决大多数归约聚合的问题但是这个接口有一个限制就是聚合 状态的类型、输出结果的类型都必须和输入数据类型一样。
AggregateFunction 可以看作是 ReduceFunction 的通用版本这里有三种类型
输入类型 IN、累加器类型ACC和输出类型OUT。输入类型 IN 就是输入流中元素的数据类型累加器类型 ACC 则是我们进行聚合的中间状态类型而输出类型OUT当然就是最终计算结果 的类型了。 接口中有四个方法 ⚫ createAccumulator()创建一个累加器这就是为聚合创建了一个初始状态每个聚 合任务只会调用一次。 ⚫ add()将输入的元素添加到累加器中。 ⚫ getResult()从累加器中提取聚合的输出结果。 ⚫ merge()合并两个累加器并将合并后的状态作为一个累加器返回 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString stream env.socketTextStream(localhost, 7777);KeyedStreamWaterSensor, String KS stream.map(new MapFunctionString, WaterSensor() {Overridepublic WaterSensor map(String s) throws Exception {String[] list s.split(,);return new WaterSensor(list[0], Long.valueOf(list[1]), Integer.valueOf(list[2]));}}).keyBy(value - value.id);// 窗口分配器 指定用什么窗口 时间、计数————滚动、滑动、会话WindowedStreamWaterSensor, String, TimeWindow window KS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));// 类型形参:// IN – The type of the values that are aggregated (input values)// ACC – The type of the accumulator (intermediate aggregate state).// OUT – The type of the aggregated resultSingleOutputStreamOperatorString aggregate window.aggregate(new AggregateFunctionWaterSensor, Integer, String() {Overridepublic Integer createAccumulator() {System.out.println(createAccumulator());return 0;}Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println(add);return value.getVc() accumulator;}Overridepublic String getResult(Integer accumulator) {return getResult accumulator.toString();}Overridepublic Integer merge(Integer a, Integer b) {// 会话窗口才用得到System.out.println(用不到的merge);return null;}});aggregate.print();env.execute();} 全窗口函数
有些场景下我们要做的计算必须基于全部的数据才有效这时做增量聚合就没什么意 义了另外输出的结果有可能要包含上下文中的一些信息比如窗口的起始时间这是增 量聚合函数做不到的。
全窗口函数需要先收集窗口中的数据并在内部缓存起来等到窗口 要输出结果的时候再取出数据进行计算。WindowFunction 和 ProcessWindowFunction。
1窗口函数WindowFunction
WindowFunction 字面上就是“窗口函数”它其实是老版本的通用窗口函数接口。我们 可以基于 WindowedStream 调用.apply()方法传入一个 WindowFunction 的实现类。不过 WindowFunction 能提供的上下文信息较少也没有更高级的功能。事实上它的作 用可以被ProcessWindowFunction 全覆盖所以之后可能会逐渐弃用。
2处理窗口函数ProcessWindowFunction
ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口。之所以说它“最 底层”是因为除了可以拿到窗口中的所有数据之外ProcessWindowFunction 还可以获取到 一个“上下文对象”Context。这个上下文对象非常强大不仅能够获取窗口信息还可以 访问当前的时间和状态信息。这里的时间就包括了处理时间processing time和事件时间水位线event time watermark。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富其 实就是一个增强版的 WindowFunction。 事实上ProcessWindowFunction 是 Flink 底层 API——处理函数process function中的 一员关于处理函数我们会在后续章节展开讲解。 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString stream env.socketTextStream(localhost, 7777);KeyedStreamWaterSensor, String KS stream.map(new MapFunctionString, WaterSensor() {Overridepublic WaterSensor map(String s) throws Exception {String[] list s.split(,);return new WaterSensor(list[0], Long.valueOf(list[1]), Integer.valueOf(list[2]));}}).keyBy(value - value.id);WindowedStreamWaterSensor, String, TimeWindow window KS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));/*** 类型形参:* IN – The type of the input value.* OUT – The type of the output value.* KEY – The type of the key.* W – The type of Window that this window function can be applied on.*/SingleOutputStreamOperatorString process window.process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {/**** param s The key 分组的key* param context The context in which the window is being evaluated.* param elements The elements in the window being evaluated.* param out A collector for emitting elements.* throws Exception*/Overridepublic void process(String s, ProcessWindowFunctionWaterSensor, String, String, TimeWindow.Context context, IterableWaterSensor elements, CollectorString out) throws Exception {// 上下文可以拿到的东西long start context.window().getStart();long end context.window().getEnd();String StartTime DateFormatUtils.format(start, yyyy-MM-dd HH:mm:ss.SSS);String EndTime DateFormatUtils.format(end, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(s 窗口[ StartTime ——— EndTime ] 有 count 条数据);}});process.print();/*** 16 s1窗口[2023-10-16 10:33:30.000———2023-10-16 10:33:35.000] 有 1 条数据* 16 s1窗口[2023-10-16 10:33:40.000———2023-10-16 10:33:45.000] 有 3 条数据* 16 s1窗口[2023-10-16 10:33:50.000———2023-10-16 10:33:55.000] 有 6 条数据* 16 s1窗口[2023-10-16 10:33:55.000———2023-10-16 10:34:00.000] 有 7 条数据*/env.execute();}
agg、pro合体 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString stream env.socketTextStream(localhost, 7777);KeyedStreamWaterSensor, String KS stream.map(new MapFunctionString, WaterSensor() {Overridepublic WaterSensor map(String s) throws Exception {String[] list s.split(,);return new WaterSensor(list[0], Long.valueOf(list[1]), Integer.valueOf(list[2]));}}).keyBy(value - value.id);WindowedStreamWaterSensor, String, TimeWindow window KS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));SingleOutputStreamOperatorString process window.aggregate(new MyAgg(), new MyProcess());process.print();env.execute();}public static class MyAgg implements AggregateFunctionWaterSensor, Integer, String{Overridepublic Integer createAccumulator() {System.out.println(createAccumulator());return 0;}Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println(add);return value.getVc() accumulator;}Overridepublic String getResult(Integer accumulator) {return getResult accumulator.toString();}Overridepublic Integer merge(Integer a, Integer b) {// 会话窗口才用得到System.out.println(用不到的merge);return null;}}public static class MyProcess extends ProcessWindowFunctionString, String, String, TimeWindow {Overridepublic void process(String s, ProcessWindowFunctionString, String, String, TimeWindow.Context context, IterableString elements, CollectorString out) throws Exception {// 上下文可以拿到的东西long start context.window().getStart();long end context.window().getEnd();String StartTime DateFormatUtils.format(start, yyyy-MM-dd HH:mm:ss.SSS);String EndTime DateFormatUtils.format(end, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(s 窗口[ StartTime ——— EndTime ] 有 count 条数据elements.toString());}}
️触发器、移除器*
上述已经默认实现
触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”本质上就是执行窗 口函数所以可以认为是计算得到结果并输出的过程。
基于WindowedStream调用.trigger()方法就可以传入一个自定义的窗口触发器Trigger。 stream.keyBy(...) .window(...) .trigger(new MyTrigger()) 移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream 调用.evictor()方法就 可以传入一个自定义的移除器Evictor。Evictor是一个接口不同的窗口类型都有各自预实 现的移除器。 stream.keyBy(...) .window(...).evictor(new MyEvictor()) 时间语义瞎起名 到底是以那种时间作为衡量标准就是所谓的“时间语义”。 在实际应用中事件时间语义真正产生的时间会更为常见。一般情况下业务日志数据中都会记录数据生成的时间戳timestamp它就可以作为事件时间的判断基础。 从 Flink1.12版本开始Flink已经将事件时间作为默认的时间语义了。 水位线
在窗口的处理过程中我 们 可以基于数据的时间戳自定义 一 个“逻辑时钟”。这个时钟的时间不会自动流逝它的时间进 展就是靠着新到数据的时间戳 来推动的 这样的好处在于计算的过程可以完全不依赖处理时间系统时间不论什么时候进行统计 处理得到的结果都是正确的。而一般实时流处理的场景中事件时间可以基本与处理时间保持同 步只是略微有一点延迟同时保证了窗口计算的正确性。 在 Flink 中用来衡量事件时间进展的标记就被称作“水位线”Watermark。 具体实现上水位线可以看作一条特殊的数据记录它是插入到数据流中的一个标记点 主要内容就是一个时间戳用来指示当前的事件时间。而它插入流中的位置就应该是在某 个数据到来之后这样就可以从这个数据中提取时间戳作为当前水位线的时间戳了。
1有序流中的水位线
理想状态数据量小数据应该按照生成的先后顺序进入流中每条数据产生一个水位线 实际应用中如果当前数据量非常大且同时涌来的数据时间差会非常小比如几毫秒往 往对处理计算也没什么影响。所以为了提高效率一般会每隔一段时间生成一个水位线。 2乱序流中的水位线
乱序 数据量小在分布式系统中数据在节点间传输会因为网络传输延迟的不确定性导致顺序发生改变这就是 所谓的“乱序数据”。 情况是数据乱序也就是说只有数据的时间戳比当前时钟大才能推动时钟前进这时才插入水位线。
乱序 数据量大如果考虑到大量数据同时到来的处理效率我们同样可以周期性地生成水位线。这时 只需要保存一下之前所有数据中的最大时间戳需要插入水位线时就直接以它作为时间戳生成新的水位 线。 我们无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据我们也可 以等上一段时间比如2秒也就是用当前已有数据的最大时间戳减去2秒就是要插入的水位线的时间戳。 这样的话9秒的数据到来之后事件时钟不会直接推进到9秒而是进展到了7秒必须等到11秒的数据到来 之后事件时钟才会进展到9秒这时迟到数据也都已收集齐0~9秒的窗口就可以正确计算结果了
️生成水位线原则
完美的水位线是“绝对正确”的也就是一个水位线一旦出现就表示这个时间之前的 数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确就必须等足够长的时间这会带来更高的延迟。
如果我们对准确性完全不考虑、一味地追求处理速度可以直接使用处理时间语义 这在理论上可以得到最低的延迟。 所以 Flink中的水位线其实是流处理中对低延迟和结果正确性的一个权衡机制而且把 控制的权力交给了程序员我们可以在代码中定义水位线的生成策略。 ️内置水位线
对于有序流主要特点就是时间戳单调增长所以永远不会出现迟到数据的问题。这是 周期性生成水位线的最简单的场景直接调用 WatermarkStrategy.forMonotonousTimestamps() 方法就可以实现
注意并行度输出 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceString stream env.socketTextStream(localhost, 7777);SingleOutputStreamOperatorWaterSensor DS stream.map(new MapFunctionString, WaterSensor() {Overridepublic WaterSensor map(String s) throws Exception {String[] list s.split(,);return new WaterSensor(list[0], Long.valueOf(list[1]), Integer.valueOf(list[2]));}});// TODO 1.定义Watermark策略WatermarkStrategyWaterSensor watermarkStrategy WatermarkStrategy// 1.1 指定watermark生成升序的watermark没有等待时间.WaterSensorforMonotonousTimestamps()// 1.2 指定 时间戳分配器从数据中提取.withTimestampAssigner(new SerializableTimestampAssignerWaterSensor() {Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {// 返回的时间戳要 毫秒System.out.println(数据 element ,recordTs recordTimestamp);return element.getTs() * 1000L;}});// TODO 2. 指定 watermark策略SingleOutputStreamOperatorWaterSensor sensorDSwithWatermark DS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor - sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}}).print();env.execute();/*** 数据WaterSensor{ids1, ts1, vc1},recordTs-9223372036854775808* 数据WaterSensor{ids1, ts2, vc1},recordTs-9223372036854775808* 数据WaterSensor{ids1, ts3, vc1},recordTs-9223372036854775808* 数据WaterSensor{ids1, ts4, vc1},recordTs-9223372036854775808* 数据WaterSensor{ids1, ts5, vc1},recordTs-9223372036854775808* keys1的窗口[1970-01-01 08:00:00.000,1970-01-01 08:00:05.000)包含4条数据[WaterSensor{ids1, ts1, vc1}, WaterSensor{ids1, ts2, vc1}, WaterSensor{ids1, ts3, vc1}, WaterSensor{ids1, ts4, vc1}]* 数据WaterSensor{ids1, ts6, vc1},recordTs-9223372036854775808* 数据WaterSensor{ids1, ts7, vc1},recordTs-9223372036854775808* 数据WaterSensor{ids1, ts9, vc1},recordTs-9223372036854775808* 数据WaterSensor{ids1, ts10, vc1},recordTs-9223372036854775808* keys1的窗口[1970-01-01 08:00:05.000,1970-01-01 08:00:10.000)包含4条数据[WaterSensor{ids1, ts5, vc1}, WaterSensor{ids1, ts6, vc1}, WaterSensor{ids1, ts7, vc1}, WaterSensor{ids1, ts9, vc1}]*/}
由于乱序流中需要等待迟到数据到齐所以必须设置一个固定量的延迟时间。这时生成 水位线的时间戳就是当前数据流中最大的时间戳减去延迟的结果相当于把表调慢当前 时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy. forBoundedOutOfOrderness()方法就 可以实现。这个方法需要传入一个 maxOutOfOrderness 参数表示“最大乱序程度”它表示 数据流中乱序数据时间戳的最大差值如果我们能确定乱序程度那么设置对应时间长度的 延迟就可以等到所有的乱序数据了。 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceString stream env.socketTextStream(localhost, 7777);SingleOutputStreamOperatorWaterSensor DS stream.map(new MapFunctionString, WaterSensor() {Overridepublic WaterSensor map(String s) throws Exception {String[] list s.split(,);return new WaterSensor(list[0], Long.valueOf(list[1]), Integer.valueOf(list[2]));}});// TODO 1.定义Watermark策略WatermarkStrategyWaterSensor watermarkStrategy WatermarkStrategy// 1.1 指定watermark生成乱的watermark没有等待时间.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3))// 1.2 指定 时间戳分配器从数据中提取.withTimestampAssigner((element, recordTimestamp) - {// 返回的时间戳要 毫秒System.out.println(数据 element ,recordTs recordTimestamp);return element.getTs() * 1000L;});// TODO 2. 指定 watermark策略SingleOutputStreamOperatorWaterSensor sensorDSwithWatermark DS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor - sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}}).print();env.execute();} 周期性水位生成器 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceString stream env.socketTextStream(localhost, 7777);SingleOutputStreamOperatorWaterSensor DS stream.map(new MapFunctionString, WaterSensor() {Overridepublic WaterSensor map(String s) throws Exception {String[] list s.split(,);return new WaterSensor(list[0], Long.valueOf(list[1]), Integer.valueOf(list[2]));}});// TODO 1.定义Watermark策略WatermarkStrategyWaterSensor watermarkStrategy WatermarkStrategy// 1.1 指定watermark生成乱的watermark没有等待时间.WaterSensorforGenerator(new WatermarkStrategyWaterSensor() {Overridepublic WatermarkGeneratorWaterSensor createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new MyWaterStrategy(3000L);}})// 1.2 指定 时间戳分配器从数据中提取.withTimestampAssigner((element, recordTimestamp) - {// 返回的时间戳要 毫秒System.out.println(数据 element ,recordTs recordTimestamp);return element.getTs() * 1000L;});// TODO 2. 指定 watermark策略SingleOutputStreamOperatorWaterSensor sensorDSwithWatermark DS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor - sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}}).print();env.execute();}public static class MyWaterStrategyT implements WatermarkGeneratorT {private long delay;private long maxTs;public MyWaterStrategy(long delay) {this.delay delay;this.maxTs Long.MIN_VALUEthis.delay1;}Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {maxTs Math.max(maxTs, eventTimestamp);}Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(maxTs- delay -1));}}
️并行水位线传递 而当一个任务接收到多个上游并行任务传递来的水位线时应该以 最小的那个作为当前任务的事件时钟。
在多个上游并行任务中如果有其中一个没有数据由于当前 Task 是以最小的那个作为 当前任务的事件时钟就会导致当前 Task 的水位线无法推进就可能导致窗口无法触发。这 时候可以设置空闲等待。 .withIdleness(Duration.ofSecond(3)) 迟到数据处理
Flink的窗口也允许迟到数据。当触发了窗口计算后会先计算当前的结果但是此时 并不会关闭窗口。
以后每来一条迟到数据就触发一次这条数据所在窗口计算(增量计算)。直到 wartermark 超过了窗口结束时间推迟时间此时窗口会真正关闭。 .window(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(3)) 允许迟到只能运用在 event time 上 时间的合流 可以发现根据某个 key 合并两条流与关系型数据库中表的 join 操作非常相近。事实 上Flink 中两条流的 connect 操作就可以通过 keyBy 指定键进行分组后合并实现了类似 于 SQL 中的 join 操作另外 connect 支持处理函数可以使用自定义实现各种需求其实已 经能够处理双流 join 的大多数场景。 不过处理函数是底层接口所以尽管 connect能做的事情多但在一些具体应用场景下还 是显得太过抽象了。比如如果我们希望统计固定时间内两条流数据的匹配情况那就需要 自定义来实现——其实这完全可以用窗口window来表示。为了更方便地实现基于时间的 合流操作Flink 的 DataStrema API 提供了内置的 join 算子。
️窗口联结Window Join public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// TODOSingleOutputStreamOperatorTuple2String, Integer DS1 env.fromElements(Tuple2.of(a, 1),Tuple2.of(a, 2),Tuple2.of(b, 7),Tuple2.of(b, 5),Tuple2.of(c, 3)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String, IntegerforMonotonousTimestamps().withTimestampAssigner((value, ts) - (value.f1 * 1000L)));SingleOutputStreamOperatorTuple3String, Integer, Integer DS2 env.fromElements(Tuple3.of(a, 1, 1),Tuple3.of(a, 8, 1),Tuple3.of(b, 8, 1),Tuple3.of(b, 5, 1),Tuple3.of(c, 3, 1)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, IntegerforMonotonousTimestamps().withTimestampAssigner((value, ts) - value.f1 * 1000L));DataStreamString join DS1.join(DS2).where(x - x.f0)// ds1的keyBy.equalTo(x - x.f0)// ds2的keyBy.window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunctionTuple2String, Integer, Tuple3String, Integer, Integer, String() {/**** param first The element from first input.* param second The element from second input.* return* throws Exception*/Overridepublic String join(Tuple2String, Integer first, Tuple3String, Integer, Integer second) throws Exception {return first ------- second;}});join.print();env.execute();} ️间隔联结Interval Join
Flink 提供了一种叫作“间隔联结”interval join的合流操作。 顾名思义间隔联结的思路就是针对一条流的每个数据开辟出其时间戳前后的一段时间间 隔看这期间是否有来自另一条流的数据匹配。 案例需求在电商网站中某些用户行为往往会有短时间内的强关联。我们这里举一个 例子我们有两条流一条是下订单的流一条是浏览数据的流。我们可以针对同一个用户 来做这样一个联结。也就是使用一个用户的下订单的事件和这个用户的最近十分钟的浏览据进行一个联结查询。