jsp网站开发实例与发布,手机版企业网站h5,wordpress任务插件,个人房产信息网查询网签备案信息1 Flink中的状态 当数据流中的许多操作只查看一个每次事件(如事件解析器)#xff0c;一些操作会跨多个事件的信息(如窗口操作)。这些操作称为有状态。状态由一个任务维护#xff0c;并且用来计算某个结果的所有数据#xff0c;都属于这个任务的状态。可以简单的任务状态就是…1 Flink中的状态 当数据流中的许多操作只查看一个每次事件(如事件解析器)一些操作会跨多个事件的信息(如窗口操作)。这些操作称为有状态。状态由一个任务维护并且用来计算某个结果的所有数据都属于这个任务的状态。可以简单的任务状态就是一个本地变量可以被任务的业务逻辑访问。 有些算子有些任务是没有状态的如map操作只跟输入数据有关。像窗口操作不管是增量窗口函数还是全窗口函数都要保持里面的信息的一开始在窗口到达结束时间之前是不输出数据的所以最后输出数据的时候他的计算是要依赖之前的全窗口可以认为是把所有数据都作为状态保存下来。增量聚合窗口来一个聚合一次要保存的是中间聚合状态。像ProcessFunction可以有状态也可以没有状态。 无状态流处理和有状态流处理的主要区别无状态流处理分别接收每条输入数据根据最新输入的数据生成输出数据有状态流处理会维护状态根据每条输入记录进行更新并基于最新输入的记录和当前的状态值生成输出记录即综合考虑多个事件之后的结果。
需要状态操作的一些例子如下
应用程序搜索某些事件模式时状态将存储迄今遇到的事件序列。每分钟/小时/天聚合事件时将状态保存挂起的聚合。在数据流上训练机器学习模型时状态保存模型参数的当前版本。需要管理历史数据时状态允许有效访问过去发生的事件。
2 状态类型 每个状态都是当前任务去管理维护每个状态都是和当前算子关联在一起的如果需要Flink真正的把他管理起来的话在运行时的时候Flink就必须要知道当前状态定义的类型是什么所以一开始必须注册对应的状态要有所谓的描述器。Flink有两种基本的状态Operator State算子状态和Keyed State键控状态他们的主要区别就是作用范围不一样算子状态的作用范围就是限定为算子任务也就是当前一个分区执行的时候所有数据来了都能访问到状态。键控状态中并不是当前分区所有的数据都能访问所有的状态而是按照keyby之后的key做划分当前key只能访问自己的状态
2.1 Operator State 每个算子状态绑定到一个并行算子实例作用范围限定为算子任务同一并行任务的状态是共享的并行处理的所有数据都可以访问到相同的状态。Kafka Connector就是使用算子状态的很好的一个例子Kafka consumer的每个并行实例都维护一个主题分区和偏移作为算子状态。当并行性发生变化时算子状态接口支持在并行运算符实例之间重新分配状态。可以有不同的方案来进行这种再分配。 因为同一个并行任务处理的所有数据都可以访问到当前的状态所以就相当于本地变量 算子状态有3种基本数据结构①列表状态List state状态表示为一组数据的列表②联合列表状态Union list state也将状态表示为数据的列表。它与常规列表状态的区别在于在发生故障时或者从保存点savepoint启动应用程序时如何恢复。③广播状态Broadcast state如果一个算子有多项任务而它的每项任务状态又都相同那么这种特殊情况最适合应用广播状态。那就可以访问到别的并行子任务的状态。 算子状态运用的时候可能应用场景没那么多一般都是keyby之后根据不同的key做分区讨论。如果所有数据来了全部统一处理的话一般还要划分成不同的状态要保存为链表并行度调整的时候可以根据这个列表拆开做进一步调整。 联合列表状态与列表状态的区别主要是并行度调整状态怎样重新分配列表状态本身分配的时候直接分配联合列表状态的话就是把所有元素都联合起来然后由每个任务自己定义最后留下哪些也就是自己截取要哪一部分。
2.2 Keyed State Keyed State只能在KeyedStream后使用键控状态总是相对于键根据键来维护和访问的 Keyed State很类似于一个分布式的key-value map数据结构只能用于KeyedStreamkeyBy算子处理之后。键控状态基于每个key去管理一般keyby进行HashCode重分区后基于它自己独享的内存空间就会针对每一个不同的key分别保存一份独立的存储状态而且接下来来了一个新的数据只能访问自己的状态不能访问其他key的Flink会为每一个key维护一个状态。 Flink的Keyed State支持的数据类型如下
序号类型说明方法1ValueState[T]用来保存单个的值ValueState.update(value: T)ValueState.value()2ListState[T]保存一个列表ListState.add(value: T)ListState.addAll(values: java.util.List[T])ListState.update(values: java.util.List[T])ListState.get()注意返回的是Iterable[T]3MapState[K, V]保存Key-Value对MapState.get(key: K)MapState.put(key: K, value: V)MapState.contains(key: K)MapState.remove(key: K)4ReducingState[T]保留一个值该值表示添加到状态的所有值的汇总需要用户提供ReduceFunctionReducingState.add(value: T)ReducingState.get()5AggregatingState[I, O]保留一个值该值表示添加到状态的所有值的汇总需要用户提供AggregateFunctionAggregatingState.add(value: T)AggregatingState.get()6FoldingStateT, ACC保留一个值该值表示添加到状态的所有值的汇总需要用户提供FoldFunctionAggregatingState.add(value: T)AggregatingState.get()每个状态都有clear()是清空操作。 在进行状态编程时需要通过RuntimeContext注册StateDescriptor。StateDescriptor以状态state的名字和存储的数据类型为参数。案例如下
class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {private var sum: ValueState[(Long, Long)] _override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit {// access the state valueval tmpCurrentSum sum.value// If it hasnt been used before, it will be nullval currentSum if (tmpCurrentSum ! null) {tmpCurrentSum} else {(0L, 0L)}// update the countval newSum (currentSum._1 1, currentSum._2 input._2)// update the statesum.update(newSum)// if the count reaches 2, emit the average and clear the stateif (newSum._1 2) {out.collect((input._1, newSum._2 / newSum._1))sum.clear()}}override def open(parameters: Configuration): Unit {sum getRuntimeContext.getState(new ValueStateDescriptor[(Long, Long)](average, createTypeInformation[(Long, Long)]))}
}object ExampleCountWindowAverage extends App {val env StreamExecutionEnvironment.getExecutionEnvironmentenv.fromCollection(List((1L, 3L),(1L, 5L),(1L, 7L),(1L, 4L),(1L, 2L))).keyBy(_._1).flatMap(new CountWindowAverage()).print()// the printed output will be (1,4) and (1,5)env.execute(ExampleManagedState)
}声明状态操作为 sum getRuntimeContext.getState(new ValueStateDescriptor[(Long, Long)](average, createTypeInformation[(Long, Long)]))读取状态为 val tmpCurrentSum sum.value更新状态为 sum.update(newSum)3 状态后端 Flink提供不同的State Backends状态后端指定如何和在何处存储状态。 1MemoryStateBackend 状将键控状态作为内存中的对象进行管理将它们存储在TaskManager的JVM堆上将checkpoint存储在JobManager的内存中 2FsStateBackend 本地状态存在TaskManager的JVM堆上checkpoint存到远程的持久化文件系统FileSystem上 3RocksDBStateBackend 将所有状态序列化后存入本地的RocksDB中存储。 设置状态后端如下
val env StreamExecutionEnvironment.getExecutionEnvironment()
//val checkpointPath: String checkpoint_Path
//val backend new RocksDBStateBackend(checkpointPath)
//env.setStateBackend(backend)env.setStateBackend(new FsStateBackend(YOUR_PATH))
env.enableCheckpointing(1000)
// 配置重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(60, Time.of(10, TimeUnit.SECONDS)))