海南教育学会网站建设,建设京东类的网站需要什么流程图,网站充值 下模板,什么网站可以自己做名片在《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》我们介绍了不会有重复数据的时间滚动窗口。本节我们将介绍存在重复计算数据的时间滑动窗口。 关于滑动窗口#xff0c;可以先看下《0基础学习PyFlink——个数滑动窗口#xff08;Sliding Count Windows#x…在《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》我们介绍了不会有重复数据的时间滚动窗口。本节我们将介绍存在重复计算数据的时间滑动窗口。 关于滑动窗口可以先看下《0基础学习PyFlink——个数滑动窗口Sliding Count Windows》。下图就是个数滑动窗口示意图。 我们看到个数滑动窗口也会因为窗口内数据不够而不被触发。但是时间滑动窗口则可以解决这个问题我们只要把窗口改成时间类型即可。 相应的代码我们参考《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》只要把TumblingProcessingTimeWindows改成SlidingProcessingTimeWindows并增加一个偏移参数Time.milliseconds(1)即可。这意味着我们将运行一个时间长度为2毫秒每次递进1毫秒的窗口。
完整代码
from typing import Iterable
import time
from pyflink.common import Types, Time
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import TimeWindow, SlidingProcessingTimeWindowsclass SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):print(*inputs, window)return [(key, len([e for e in inputs]))]word_count_data [(A,2),(A,1),(A,4),(A,3),(A,6),(A,5),(A,7),(A,8),(A,9),(A,10),(A,11),(A,12),(A,13),(A,14),(A,15),(A,16),(A,17),(A,18),(A,19),(A,20)]def word_count():env StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyedsource.key_by(lambda i: i[0]) # reducingreducedkeyed.window(SlidingProcessingTimeWindows.of(Time.milliseconds(2), Time.milliseconds(1))) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ __main__:word_count()运行结果
运行两次上述代码我们发现每次都不同而且有重叠计算的元素。 (‘A’, 2) (‘A’, 1) (‘A’, 4) TimeWindow(start1698773292650, end1698773292652) (‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) TimeWindow(start1698773292651, end1698773292653) (A,3) (A,11) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start1698773292652, end1698773292654) (A,17) (‘A’, 2) (‘A’, 1) (‘A’, 4) TimeWindow(start1698773319933, end1698773319935) (‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) TimeWindow(start1698773319934, end1698773319936) (A,3) (A,12) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start1698773319935, end1698773319937) (A,17) 参考资料
https://nightlies.apache.org/flink/flink-docs-master/api/python/reference/pyflink.datastream/api/pyflink.datastream.window.SlidingProcessingTimeWindows.html#pyflink.datastream.window.SlidingProcessingTimeWindows