建站网站免费,西安哪里找做网站公司,通过网站如何做海外贸易,做电影网站怎么挣钱场景
自定义Map或者别的算子的时候#xff0c;有时候需要定义一些类变量#xff0c;在flink内部高并发的情况下需要正确理解这些变量的行为
代码
package com.pg.function;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common…场景
自定义Map或者别的算子的时候有时候需要定义一些类变量在flink内部高并发的情况下需要正确理解这些变量的行为
代码
package com.pg.function;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;public class FlinkFunction {//对于自定义函数中的变量只有内置的状态是完全按照flink内置的 keyBy行为来的//如果是自定义的缓存比如ArrayList 则可能不会按照预期的行为public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamString dataStream env.fromElements( b,b,b,c,c,c,d,d,d);dataStream.keyBy(x-{return x;}).map(new MyMap()).print();env.execute();}}class MyMap extends RichMapFunctionString, String {public ArrayListString list new ArrayList();
// public ValueStateInteger counter;//存储数据条数
// public ValueStateString element;//存储临时数据
// Override
// public void open(Configuration parameters) throws Exception {
// counter getRuntimeContext().getState(new ValueStateDescriptorInteger(counter, Types.INT));
// element getRuntimeContext().getState(new ValueStateDescriptor(element, Types.STRING));
// }Overridepublic String map(String s) throws Exception {list.add(s);if(list.size()2){String re list.toString();list.clear();return re;}else {return null;}
// if (counter.value() null) {
// counter.update(1);//遇见第一条数据的时候计数器为1
// } else {
// counter.update(counter.value() 1);
// }
// if (element.value() null) {
// element.update(s);//element只存储上一次到来的数据
// }else {
// element.update(element.value()s);
// }
// if (counter.value() 2) {
// String re element.value();
// //发出结果之后清楚状态
// counter.clear();
// element.clear();
// return re;
// }else {
// return null;
// }}
}
分析
keyBy之后理论上相同key的会在map中用同样的处理逻辑我们的预期行为是输出bb,cc,dd 但是用ArrayList实现的逻辑最终输出却是bb,bc,cc,dd 用ValueState的输出是bb,cc,dd 这说明了keBy后的逻辑ArrayList不会按照预期的行为执行。这是因为在flink中当多个并发的时候多个key如果落入同一个线程 则当前线程的valueState是和某一个key绑定的符合flink预期行为但是ArrayList以及其它你定义的变量则不做保证, 它是线程级别的局部变量, 这点要注意。