电力网站建设,哲程软件,线上p2p网站建设,即墨做网站系列文章目录
Flink1.17实战教程#xff08;第一篇#xff1a;概念、部署、架构#xff09; Flink1.17实战教程#xff08;第二篇#xff1a;DataStream API#xff09; Flink1.17实战教程#xff08;第三篇#xff1a;时间和窗口#xff09; Flink1.17实战教程…系列文章目录
Flink1.17实战教程第一篇概念、部署、架构 Flink1.17实战教程第二篇DataStream API Flink1.17实战教程第三篇时间和窗口 Flink1.17实战教程第四篇处理函数 Flink1.17实战教程第五篇状态管理 Flink1.17实战教程第六篇容错机制 Flink1.17实战教程第七篇Flink SQL 文章目录 系列文章目录1. Flink中的状态1.1 概述1.2 状态的分类 2. 按键分区状态Keyed State2.1 值状态ValueState2.2 列表状态ListState2.3 Map状态MapState2.4 归约状态ReducingState2.5 聚合状态AggregatingState2.6 状态生存时间TTL 3. 算子状态Operator State3.1 列表状态ListState3.2 联合列表状态UnionListState3.3 广播状态BroadcastState 4. 状态后端State Backends4.1 状态后端的分类HashMapStateBackend/RocksDB4.2 如何选择正确的状态后端4.3 状态后端的配置 1. Flink中的状态
1.1 概述 1.2 状态的分类
1托管状态Managed State和原始状态Raw State Flink的状态有两种托管状态Managed State和原始状态Raw State。托管状态就是由Flink统一管理的状态的存储访问、故障恢复和重组等一系列问题都由Flink实现我们只要调接口就可以而原始状态则是自定义的相当于就是开辟了一块内存需要我们自己管理实现状态的序列化和故障恢复。 通常我们采用Flink托管状态来实现需求。
2算子状态Operator State和按键分区状态Keyed State 接下来我们的重点就是托管状态Managed State。 我们知道在Flink中一个算子任务会按照并行度分为多个并行子任务执行而不同的子任务会占据不同的任务槽task slot。由于不同的slot在计算资源上是物理隔离的所以Flink能管理的状态在并行任务间是无法共享的每个状态只能针对当前子任务的实例有效。 而很多有状态的操作比如聚合、窗口都是要先做keyBy进行按键分区的。按键分区之后任务所进行的所有计算都应该只针对当前key有效所以状态也应该按照key彼此隔离。在这种情况下状态的访问方式又会有所不同。 基于这样的想法我们又可以将托管状态分为两类算子状态和按键分区状态。 另外也可以通过富函数类Rich Function来自定义Keyed State所以只要提供了富函数类接口的算子也都可以使用Keyed State。所以即使是map、filter这样无状态的基本转换算子我们也可以通过富函数类给它们“追加”Keyed State。比如RichMapFunction、RichFilterFunction。在富函数中我们可以调用.getRuntimeContext()获取当前的运行时上下文RuntimeContext进而获取到访问状态的句柄这种富函数中自定义的状态也是Keyed State。从这个角度讲Flink中所有的算子都可以是有状态的。 无论是Keyed State还是Operator State它们都是在本地实例上维护的也就是说每个并行子任务维护着对应的状态算子的子任务之间状态不共享。
2. 按键分区状态Keyed State
按键分区状态Keyed State顾名思义是任务按照键key来访问和维护的状态。它的特点非常鲜明就是以key为作用范围进行隔离。
需要注意使用Keyed State必须基于KeyedStream。没有进行keyBy分区的DataStream即使转换算子实现了对应的富函数类也不能通过运行时上下文访问Keyed State。
2.1 值状态ValueState
顾名思义状态中只保存一个“值”value。ValueState本身是一个接口源码中定义如下
public interface ValueStateT extends State {T value() throws IOException;void update(T value) throws IOException;
}这里的T是泛型表示状态的数据内容可以是任何具体的数据类型。如果想要保存一个长整型值作为状态那么类型就是ValueState。 我们可以在代码中读写值状态实现对于状态的访问和更新。
T value()获取当前状态的值update(T value)对状态进行更新传入的参数value就是要覆写的状态值。
在具体使用时为了让运行时上下文清楚到底是哪个状态我们还需要创建一个“状态描述器”StateDescriptor来提供状态的基本信息。例如源码中ValueState的状态描述器构造方法如下
public ValueStateDescriptor(String name, ClassT typeClass) {super(name, typeClass, null);
}这里需要传入状态的名称和类型——这跟我们声明一个变量时做的事情完全一样。 案例需求检测每种传感器的水位值如果连续的两个水位值超过10就输出报警。
public class KeyedValueStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {// TODO 1.定义状态ValueStateInteger lastVcState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// TODO 2.在open方法中初始化状态// 状态描述器两个参数第一个参数起个名字不重复第二个参数存储的类型lastVcState getRuntimeContext().getState(new ValueStateDescriptorInteger(lastVcState, Types.INT));}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {
// lastVcState.value(); // 取出 本组 值状态 的数据
// lastVcState.update(); // 更新 本组 值状态 的数据
// lastVcState.clear(); // 清除 本组 值状态 的数据// 1. 取出上一条数据的水位值(Integer默认值是null判断)int lastVc lastVcState.value() null ? 0 : lastVcState.value();// 2. 求差值的绝对值判断是否超过10Integer vc value.getVc();if (Math.abs(vc - lastVc) 10) {out.collect(传感器 value.getId() 当前水位值 vc ,与上一条水位值 lastVc ,相差超过10);}// 3. 更新状态里的水位值lastVcState.update(vc);}}).print();env.execute();}
}2.2 列表状态ListState
将需要保存的数据以列表List的形式组织起来。在ListState接口中同样有一个类型参数T表示列表中数据的类型。ListState也提供了一系列的方法来操作状态使用方式与一般的List非常相似。
Iterable get()获取当前的列表状态返回的是一个可迭代类型Iterableupdate(List values)传入一个列表values直接对状态进行覆盖add(T value)在状态列表中添加一个元素valueaddAll(List values)向列表中添加多个元素以列表values形式传入。
类似地ListState的状态描述器就叫作ListStateDescriptor用法跟ValueStateDescriptor完全一致。 案例针对每种传感器输出最高的3个水位值
public class KeyedListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {ListStateInteger vcListState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcListState getRuntimeContext().getListState(new ListStateDescriptorInteger(vcListState, Types.INT));}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {// 1.来一条存到list状态里vcListState.add(value.getVc());// 2.从list状态拿出来(Iterable) 拷贝到一个List中排序 只留3个最大的IterableInteger vcListIt vcListState.get();// 2.1 拷贝到List中ListInteger vcList new ArrayList();for (Integer vc : vcListIt) {vcList.add(vc);}// 2.2 对List进行降序排序vcList.sort((o1, o2) - o2 - o1);// 2.3 只保留最大的3个(list中的个数一定是连续变大一超过3就立即清理即可)if (vcList.size() 3) {// 将最后一个元素清除第4个vcList.remove(3);}out.collect(传感器id为 value.getId() ,最大的3个水位值 vcList.toString());// 3.更新list状态vcListState.update(vcList);// vcListState.get(); //取出 list状态 本组的数据是一个Iterable
// vcListState.add(); // 向 list状态 本组 添加一个元素
// vcListState.addAll(); // 向 list状态 本组 添加多个元素
// vcListState.update(); // 更新 list状态 本组数据覆盖
// vcListState.clear(); // 清空List状态 本组数据}}).print();env.execute();}
}2.3 Map状态MapState
把一些键值对key-value作为状态整体保存起来可以认为就是一组key-value映射的列表。对应的MapStateUK, UV接口中就会有UK、UV两个泛型分别表示保存的key和value的类型。同样MapState提供了操作映射状态的方法与Map的使用非常类似。
get(UK key)传入一个key作为参数查询对应的value值put(UK key, UV value)传入一个键值对更新key对应的value值putAll(MapUK, UV map)将传入的映射map中所有的键值对全部添加到映射状态中remove(UK key)将指定key对应的键值对删除boolean contains(UK key)判断是否存在指定的key返回一个boolean值。另外MapState也提供了获取整个映射相关信息的方法IterableMap.EntryUK, UV entries()获取映射状态中所有的键值对Iterable keys()获取映射状态中所有的键key返回一个可迭代Iterable类型Iterable values()获取映射状态中所有的值value返回一个可迭代Iterable类型boolean isEmpty()判断映射是否为空返回一个boolean值。
案例需求统计每种传感器每种水位值出现的次数。
public class KeyedMapStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {MapStateInteger, Integer vcCountMapState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcCountMapState getRuntimeContext().getMapState(new MapStateDescriptorInteger, Integer(vcCountMapState, Types.INT, Types.INT));}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {// 1.判断是否存在vc对应的keyInteger vc value.getVc();if (vcCountMapState.contains(vc)) {// 1.1 如果包含这个vc的key直接对value1Integer count vcCountMapState.get(vc);vcCountMapState.put(vc, count);} else {// 1.2 如果不包含这个vc的key初始化put进去vcCountMapState.put(vc, 1);}// 2.遍历Map状态输出每个k-v的值StringBuilder outStr new StringBuilder();outStr.append(\n);outStr.append(传感器id为 value.getId() \n);for (Map.EntryInteger, Integer vcCount : vcCountMapState.entries()) {outStr.append(vcCount.toString() \n);}outStr.append(\n);out.collect(outStr.toString());// vcCountMapState.get(); // 对本组的Map状态根据key获取value
// vcCountMapState.contains(); // 对本组的Map状态判断key是否存在
// vcCountMapState.put(, ); // 对本组的Map状态添加一个 键值对
// vcCountMapState.putAll(); // 对本组的Map状态添加多个 键值对
// vcCountMapState.entries(); // 对本组的Map状态获取所有键值对
// vcCountMapState.keys(); // 对本组的Map状态获取所有键
// vcCountMapState.values(); // 对本组的Map状态获取所有值
// vcCountMapState.remove(); // 对本组的Map状态根据指定key移除键值对
// vcCountMapState.isEmpty(); // 对本组的Map状态判断是否为空
// vcCountMapState.iterator(); // 对本组的Map状态获取迭代器
// vcCountMapState.clear(); // 对本组的Map状态清空}}).print();env.execute();}
}2.4 归约状态ReducingState
类似于值状态Value不过需要对添加进来的所有数据进行归约将归约聚合之后的值作为状态保存下来。ReducingState这个接口调用的方法类似于ListState只不过它保存的只是一个聚合值所以调用.add()方法时不是在状态列表里添加元素而是直接把新数据和之前的状态进行归约并用得到的结果更新状态。 归约逻辑的定义是在归约状态描述器ReducingStateDescriptor中通过传入一个归约函数ReduceFunction来实现的。这里的归约函数就是我们之前介绍reduce聚合算子时讲到的ReduceFunction所以状态类型跟输入的数据类型是一样的。
public ReducingStateDescriptor(String name, ReduceFunctionT reduceFunction, ClassT typeClass) {...}这里的描述器有三个参数其中第二个参数就是定义了归约聚合逻辑的ReduceFunction另外两个参数则是状态的名称和类型。 案例计算每种传感器的水位和
......
.process(new KeyedProcessFunctionString WaterSensor Integer() {private ReducingStateInteger sumVcState;Overridepublic void open(Configuration parameters) throws Exception {sumVcState this.getRuntimeContext().getReducingState(new ReducingStateDescriptorInteger(sumVcStateInteger::sumInteger.class));}Overridepublic void processElement(WaterSensor value Context ctx CollectorInteger out) throws Exception {sumVcState.add(value.getVc());out.collect(sumVcState.get());}
})2.5 聚合状态AggregatingState
与归约状态非常类似聚合状态也是一个值用来保存添加进来的所有数据的聚合结果。与ReducingState不同的是它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数AggregateFunction来定义的这也就是之前我们讲过的AggregateFunction里面通过一个累加器Accumulator来表示状态所以聚合的状态类型可以跟添加进来的数据类型完全不同使用更加灵活。 同样地AggregatingState接口调用方法也与ReducingState相同调用.add()方法添加元素时会直接使用指定的AggregateFunction进行聚合并更新状态。 案例需求计算每种传感器的平均水位
public class KeyedAggregatingStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {AggregatingStateInteger, Double vcAvgAggregatingState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcAvgAggregatingState getRuntimeContext().getAggregatingState(new AggregatingStateDescriptorInteger, Tuple2Integer, Integer, Double(vcAvgAggregatingState,new AggregateFunctionInteger, Tuple2Integer, Integer, Double() {Overridepublic Tuple2Integer, Integer createAccumulator() {return Tuple2.of(0, 0);}Overridepublic Tuple2Integer, Integer add(Integer value, Tuple2Integer, Integer accumulator) {return Tuple2.of(accumulator.f0 value, accumulator.f1 1);}Overridepublic Double getResult(Tuple2Integer, Integer accumulator) {return accumulator.f0 * 1D / accumulator.f1;}Overridepublic Tuple2Integer, Integer merge(Tuple2Integer, Integer a, Tuple2Integer, Integer b) {
// return Tuple2.of(a.f0 b.f0, a.f1 b.f1);return null;}},Types.TUPLE(Types.INT, Types.INT)));}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {// 将 水位值 添加到 聚合状态中vcAvgAggregatingState.add(value.getVc());// 从 聚合状态中 获取结果Double vcAvg vcAvgAggregatingState.get();out.collect(传感器id为 value.getId() ,平均水位值 vcAvg);// vcAvgAggregatingState.get(); // 对 本组的聚合状态 获取结果
// vcAvgAggregatingState.add(); // 对 本组的聚合状态 添加数据会自动进行聚合
// vcAvgAggregatingState.clear(); // 对 本组的聚合状态 清空数据}}).print();env.execute();}
}2.6 状态生存时间TTL
在实际应用中很多状态会随着时间的推移逐渐增长如果不加以限制最终就会导致存储空间的耗尽。一个优化的思路是直接在代码中调用.clear()方法去清除状态但是有时候我们的逻辑要求不能直接清除。这时就需要配置一个状态的“生存时间”time-to-liveTTL当状态在内存中存在的时间超出这个值时就将它清除。
具体实现上如果用一个进程不停地扫描所有状态看是否过期显然会占用大量资源做无用功。状态的失效其实不需要立即删除所以我们可以给状态附加一个属性也就是状态的“失效时间”。状态创建的时候设置 失效时间 当前时间 TTL之后如果有对状态的访问和修改我们可以再对失效时间进行更新当设置的清除条件被触发时比如状态被访问的时候或者每隔一段时间扫描一次失效状态就可以判断状态是否失效、从而进行清除了。
配置状态的TTL时需要创建一个StateTtlConfig配置对象然后调用状态描述器的.enableTimeToLive()方法启动TTL功能。
StateTtlConfig ttlConfig StateTtlConfig.newBuilder(Time.seconds(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();ValueStateDescriptorString stateDescriptor new ValueStateDescriptor(my state, String.class);stateDescriptor.enableTimeToLive(ttlConfig);这里用到了几个配置项
.newBuilder() 状态TTL配置的构造器方法必须调用返回一个Builder之后再调用.build()方法就可以得到StateTtlConfig了。方法需要传入一个Time作为参数这就是设定的状态生存时间。.setUpdateType() 设置更新类型。更新类型指定了什么时候更新状态失效时间这里的OnCreateAndWrite表示只有创建状态和更改状态写操作时更新失效时间。另一种类型OnReadAndWrite则表示无论读写操作都会更新失效时间也就是只要对状态进行了访问就表明它是活跃的从而延长生存时间。这个配置默认为OnCreateAndWrite。.setStateVisibility() 设置状态的可见性。所谓的“状态可见性”是指因为清除操作并不是实时的所以当状态过期之后还有可能继续存在这时如果对它进行访问能否正常读取到就是一个问题了。这里设置的NeverReturnExpired是默认行为表示从不返回过期值也就是只要过期就认为它已经被清除了应用不能继续读取这在处理会话或者隐私数据时比较重要。对应的另一种配置是ReturnExpireDefNotCleanedUp就是如果过期状态还存在就返回它的值。 除此之外TTL配置还可以设置在保存检查点checkpoint时触发清除操作或者配置增量的清理incremental cleanup还可以针对RocksDB状态后端使用压缩过滤器compaction filter进行后台清理。这里需要注意目前的TTL设置只支持处理时间。
public class StateTTLDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {ValueStateInteger lastVcState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// TODO 1.创建 StateTtlConfigStateTtlConfig stateTtlConfig StateTtlConfig.newBuilder(Time.seconds(5)) // 过期时间5s
// .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 状态 创建和写入更新 更新 过期时间.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) // 状态 读取、创建和写入更新 更新 过期时间.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期的状态值.build();// TODO 2.状态描述器 启用 TTLValueStateDescriptorInteger stateDescriptor new ValueStateDescriptor(lastVcState, Types.INT);stateDescriptor.enableTimeToLive(stateTtlConfig);this.lastVcState getRuntimeContext().getState(stateDescriptor);}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {// 先获取状态值打印 》 读取状态Integer lastVc lastVcState.value();out.collect(key value.getId() ,状态值 lastVc);// 如果水位大于10更新状态值 》 写入状态if (value.getVc() 10) {lastVcState.update(value.getVc());}}}).print();env.execute();}
}3. 算子状态Operator State
算子状态Operator State就是一个算子并行实例上定义的状态作用范围被限定为当前算子任务。算子状态跟数据的key无关所以不同key的数据只要被分发到同一个并行子任务就会访问到同一个Operator State。 算子状态的实际应用场景不如Keyed State多一般用在Source或Sink等与外部系统连接的算子上或者完全没有key定义的场景。比如Flink的Kafka连接器中就用到了算子状态。 当算子的并行度发生变化时算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同重组分配的方案也会不同。 算子状态也支持不同的结构类型主要有三种ListState、UnionListState和BroadcastState。
3.1 列表状态ListState
与Keyed State中的ListState一样将状态表示为一组数据的列表。 与Keyed State中的列表状态的区别是在算子状态的上下文中不会按键key分别处理状态所以每一个并行子任务上只会保留一个“列表”list也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度彼此之间完全独立。 当算子并行度进行缩放调整时算子的列表状态中的所有元素项会被统一收集起来相当于把多个分区的列表合并成了一个“大列表”然后再均匀地分配给所有并行任务。这种“均匀分配”的具体方法就是“轮询”round-robin与之前介绍的rebanlance数据传输方式类似是通过逐一“发牌”的方式将状态项平均分配的。这种方式也叫作“平均分割重组”even-split redistribution。 算子状态中不会存在“键组”key group这样的结构所以为了方便重组分配就把它直接定义成了“列表”list。这也就解释了为什么算子状态中没有最简单的值状态ValueState。 案例实操在map算子中计算数据的个数。
public class OperatorListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.socketTextStream(hadoop102, 7777).map(new MyCountMapFunction()).print();env.execute();}// TODO 1.实现 CheckpointedFunction 接口public static class MyCountMapFunction implements MapFunctionString, Long, CheckpointedFunction {private Long count 0L;private ListStateLong state;Overridepublic Long map(String value) throws Exception {return count;}/*** TODO 2.本地变量持久化将 本地变量 拷贝到 算子状态中,开启checkpoint时才会调用** param context* throws Exception*/Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println(snapshotState...);// 2.1 清空算子状态state.clear();// 2.2 将 本地变量 添加到 算子状态 中state.add(count);}/*** TODO 3.初始化本地变量程序启动和恢复时 从状态中 把数据添加到 本地变量每个子任务调用一次** param context* throws Exception*/Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println(initializeState...);// 3.1 从 上下文 初始化 算子状态state context.getOperatorStateStore().getListState(new ListStateDescriptorLong(state, Types.LONG));// 3.2 从 算子状态中 把数据 拷贝到 本地变量if (context.isRestored()) {for (Long c : state.get()) {count c;}}}}
}3.2 联合列表状态UnionListState
与ListState类似联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于算子并行度进行缩放调整时对于状态的分配方式不同。 UnionListState的重点就在于“联合”union。在并行度调整时常规列表状态是轮询分配状态项而联合列表状态的算子则会直接广播状态的完整列表。这样并行度缩放之后的并行子任务就获取到了联合后完整的“大列表”可以自行选择要使用的状态项和要丢弃的状态项。这种分配也叫作“联合重组”union redistribution。如果列表中状态项数量太多为资源和效率考虑一般不建议使用联合重组的方式。 使用方式同ListState区别在如下标红部分
state context.getOperatorStateStore().getUnionListState(new ListStateDescriptorLong(union-state, Types.LONG));3.3 广播状态BroadcastState
有时我们希望算子并行子任务都保持同一份“全局”状态用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态状态就像被“广播”到所有分区一样这种特殊的算子状态就叫作广播状态BroadcastState。 因为广播状态在每个并行子任务上的实例都一样所以在并行度调整的时候就比较简单只要复制一份到新的并行任务就可以实现扩展而对于并行度缩小的情况可以将多余的并行子任务连同状态直接砍掉——因为状态都是复制出来的并不会丢失。 案例实操水位超过指定的阈值发送告警阈值可以动态修改。
public class OperatorBroadcastStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 数据流SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction());// 配置流用来广播配置DataStreamSourceString configDS env.socketTextStream(hadoop102, 8888);// TODO 1. 将 配置流 广播MapStateDescriptorString, Integer broadcastMapState new MapStateDescriptor(broadcast-state, Types.STRING, Types.INT);BroadcastStreamString configBS configDS.broadcast(broadcastMapState);// TODO 2.把 数据流 和 广播后的配置流 connectBroadcastConnectedStreamWaterSensor, String sensorBCS sensorDS.connect(configBS);// TODO 3.调用 processsensorBCS.process(new BroadcastProcessFunctionWaterSensor, String, String() {/*** 数据流的处理方法 数据流 只能 读取 广播状态不能修改* param value* param ctx* param out* throws Exception*/Overridepublic void processElement(WaterSensor value, ReadOnlyContext ctx, CollectorString out) throws Exception {// TODO 5.通过上下文获取广播状态取出里面的值只读不能修改ReadOnlyBroadcastStateString, Integer broadcastState ctx.getBroadcastState(broadcastMapState);Integer threshold broadcastState.get(threshold);// 判断广播状态里是否有数据因为刚启动时可能是数据流的第一条数据先来threshold (threshold null ? 0 : threshold);if (value.getVc() threshold) {out.collect(value ,水位超过指定的阈值 threshold !!!);}}/*** 广播后的配置流的处理方法: 只有广播流才能修改 广播状态* param value* param ctx* param out* throws Exception*/Overridepublic void processBroadcastElement(String value, Context ctx, CollectorString out) throws Exception {// TODO 4. 通过上下文获取广播状态往里面写数据BroadcastStateString, Integer broadcastState ctx.getBroadcastState(broadcastMapState);broadcastState.put(threshold, Integer.valueOf(value));}}).print();env.execute();}
}4. 状态后端State Backends
在Flink中状态的存储、访问以及维护都是由一个可插拔的组件决定的这个组件就叫作状态后端state backend。状态后端主要负责管理本地状态的存储方式和位置。
4.1 状态后端的分类HashMapStateBackend/RocksDB
状态后端是一个“开箱即用”的组件可以在不改变应用程序逻辑的情况下独立配置。Flink中提供了两类不同的状态后端一种是“哈希表状态后端”HashMapStateBackend另一种是“内嵌RocksDB状态后端”EmbeddedRocksDBStateBackend。如果没有特别配置系统默认的状态后端是HashMapStateBackend。 1哈希表状态后端HashMapStateBackend HashMapStateBackend是把状态存放在内存里。具体实现上哈希表状态后端在内部会直接把状态当作对象objects保存在Taskmanager的JVM堆上。普通的状态以及窗口中收集的数据和触发器都会以键值对的形式存储起来所以底层是一个哈希表HashMap这种状态后端也因此得名。 2内嵌RocksDB状态后端EmbeddedRocksDBStateBackend RocksDB是一种内嵌的key-value存储介质可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend后会将处理中的数据全部放入RocksDB数据库中RocksDB默认存储在TaskManager的本地数据目录里。 RocksDB的状态数据被存储为序列化的字节数组读写操作需要序列化/反序列化因此状态的访问性能要差一些。另外因为做了序列化key的比较也会按照字节进行而不是直接调用.hashCode()和.equals()方法。 EmbeddedRocksDBStateBackend始终执行的是异步快照所以不会因为保存检查点而阻塞数据的处理而且它还提供了增量式保存检查点的机制这在很多情况下可以大大提升保存效率。
4.2 如何选择正确的状态后端
HashMap和RocksDB两种状态后端最大的区别就在于本地状态存放在哪里。 HashMapStateBackend是内存计算读写速度非常快但是状态的大小会受到集群可用内存的限制如果应用的状态随着时间不停地增长就会耗尽内存资源。 而RocksDB是硬盘存储所以可以根据可用的磁盘空间进行扩展所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化而且可能需要直接从磁盘读取数据这就会导致性能的降低平均读写性能要比HashMapStateBackend慢一个数量级。
4.3 状态后端的配置
在不做配置的时候应用程序使用的默认状态后端是由集群配置文件flink-conf.yaml中指定的配置的键名称为state.backend。这个默认配置对集群上运行的所有作业都有效我们可以通过更改配置值来改变默认的状态后端。另外我们还可以在代码中为当前作业单独配置状态后端这个配置会覆盖掉集群配置文件的默认值。 1配置默认的状态后端 在flink-conf.yaml中可以使用state.backend来配置默认状态后端。 配置项的可能值为hashmap这样配置的就是HashMapStateBackend如果配置项的值是rocksdb这样配置的就是EmbeddedRocksDBStateBackend。 下面是一个配置HashMapStateBackend的例子
# 默认状态后端
state.backend: hashmap# 存放检查点的文件路径
state.checkpoints.dir: hdfs://hadoop102:8020/flink/checkpoints这里的state.checkpoints.dir配置项定义了检查点和元数据写入的目录。
2为每个作业Per-job/Application单独配置状态后端 通过执行环境设置HashMapStateBackend。
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new HashMapStateBackend());通过执行环境设置EmbeddedRocksDBStateBackend。
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new EmbeddedRocksDBStateBackend());需要注意如果想在IDE中使用EmbeddedRocksDBStateBackend需要为Flink项目添加依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-statebackend-rocksdb/artifactIdversion${flink.version}/version
/dependency而由于Flink发行版中默认就包含了RocksDB(服务器上解压的Flink)所以只要我们的代码中没有使用RocksDB的相关内容就不需要引入这个依赖。