哪里有网站建设工程,wordpress 上传图片 出错,wordpress 主题 响应,工程建设项目管理系统平台背景
在flink中对两个流进行connect之后进行出处理的场景很常见#xff0c;我们本文就以书中的一个例子为例说明下实现一个CoProcessFunction的一些要点
实现CoProcessFunction的一些要点
这个例子举例的是当收到某个传感器放行的控制消息时#xff0c;从传感器传来的温度…背景
在flink中对两个流进行connect之后进行出处理的场景很常见我们本文就以书中的一个例子为例说明下实现一个CoProcessFunction的一些要点
实现CoProcessFunction的一些要点
这个例子举例的是当收到某个传感器放行的控制消息时从传感器传来的温度流消息会被运行向下游传递一段时间
/*** 展示CoProcessFunctiononTimer使用方法的例子*/
public class CoProcessFunctionTimers {public static void main(String[] args) throws Exception {// set up the streaming execution environmentStreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// use event time for the applicationenv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 控制消息流允许传感器消息流通过指定长度的时间DataStreamTuple2String, Long filterSwitches env.fromElements(// forward readings of sensor_2 for 10 secondsTuple2.of(sensor_2, 10_000L),// forward readings of sensor_7 for 1 minuteTuple2.of(sensor_7, 60_000L));// 传感器消息流DataStreamSensorReading readings env// SensorSource generates random temperature readings.addSource(new SensorSource());//传感器消息流connet控制消息流并且按照传感器id作为key进行分组DataStreamSensorReading forwardedReadings readings//连接控制消息流.connect(filterSwitches)// 按照传感器id分组.keyBy(r - r.id, s - s.f0)// 应用CoProcessFunction onTimer函数.process(new ReadingFilter());forwardedReadings.print();env.execute(Filter sensor readings);}//应用CoProcessFunction onTimer函数,这已经按照key传感器id分好组public static class ReadingFilter extends CoProcessFunctionSensorReading, Tuple2String, Long, SensorReading {// 传感器开关状态--键值分区状态,key是传感器idprivate ValueStateBoolean forwardingEnabled;// 保存传感器开关持续时间的状态--键值分区状态,key是传感器idprivate ValueStateLong disableTimer;// 初始化键值分区状态 key是传感器idpublic void open(Configuration parameters) throws Exception {forwardingEnabled getRuntimeContext().getState(new ValueStateDescriptor(filterSwitch, Types.BOOLEAN));disableTimer getRuntimeContext().getState(new ValueStateDescriptorLong(timer, Types.LONG));}Overridepublic void processElement1(SensorReading r, Context ctx, CollectorSensorReading out) throws Exception {// 处理传感器消息流首先检查key是传感器id对应的键值分区状态如果开启那么这个传感器消息就可以正常通过Boolean forward forwardingEnabled.value();if (forward ! null forward) {out.collect(r);}}Overridepublic void processElement2(Tuple2String, Long s, Context ctx, CollectorSensorReading out) throws Exception {//控制流消息过来后更新键值分区的开关状态为true, key是传感器idforwardingEnabled.update(true);//控制流消息过来后更新键值分区的开关状态为true的持续时长的定时器, key是传感器idlong timerTimestamp ctx.timerService().currentProcessingTime() s.f1;Long curTimerTimestamp disableTimer.value();if (curTimerTimestamp null || timerTimestamp curTimerTimestamp) {// remove current timerif (curTimerTimestamp ! null) {ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp);}// register new timerctx.timerService().registerProcessingTimeTimer(timerTimestamp);disableTimer.update(timerTimestamp);}}// 键值开关状态的持续时间定时器key是传感器id,注意在ontimer方法中也可以通过out.collect的方式向下游算子发送消息public void onTimer(long ts, OnTimerContext ctx, CollectorSensorReading out) throws Exception {// 定时器时间到了之后清理掉传感器的开关状态forwardingEnabled.clear();disableTimer.clear();}}
}以上就是实现一个CoProcessFunction的大概逻辑