当前位置: 首页 > news >正文

网站怎么做跳转链接深圳电商网站公司

网站怎么做跳转链接,深圳电商网站公司,网站建设案例市场,吴中seo页面优化推广文章目录 前言一、状态分类二、keyed代码示例ListStateMapState 总结 前言 状态在Flink中叫做State#xff0c;用来保存中间计算结果或者缓存数据。要做到比较好的状态管理#xff0c;需要考虑以下几点内容#xff1a; 状态数据的存储和访问 在Task内部#xff0c;如何高… 文章目录 前言一、状态分类二、keyed代码示例ListStateMapState 总结 前言 状态在Flink中叫做State用来保存中间计算结果或者缓存数据。要做到比较好的状态管理需要考虑以下几点内容 状态数据的存储和访问 在Task内部如何高效地保存状态数据和使用状态数据。状态数据的备份和恢复 作业失败是无法避免的那么就要考虑如何高效地将状态数据保存下来避免状态备份降低集群的吞吐量并且在Failover时恢复作业到失败前的状态。状态数据的划分和动态扩容 作业在集群内并行执行那么就要思考对于作业的Task而言如何使用统一的方式对状态数据进行切分在作业修改并行度导致Task数据改变的时候如何确保正确地恢复。 一、状态分类 State按照是否有Key划分KeyedState和OperatorState两种。按照数据结构不同flink定义了多种state分别应用于不同的场景具体实现如下ValueState、ListState、MapState、ReducingState、AggregatingState。 ValueState: 保存一个可以更新和检索的值如上所述每个值都对应到当前的输入数据的 key因此算子接收到的每个 key 都可能对应一个值。 这个值可以通过 update(T) 进行更新通过 T value() 进行检索。 ListState: 保存一个元素的列表。可以往这个列表中追加数据并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素通过 Iterable get() 获得整个列表。还可以通过 update(List) 覆盖当前的列表。 ReducingState: 保存一个单值表示添加到状态的所有值的聚合。接口与 ListState 类似但使用 add(T) 增加元素会使用提供的 ReduceFunction 进行聚合。 AggregatingStateIN, OUT: 保留一个单值表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。 MapStateUK, UV: 维护了一个映射列表。 你可以添加键值对到状态中也可以获得反映当前所有映射的迭代器。使用 put(UKUV) 或者 putAll(MapUKUV) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries()keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。 二、keyed代码示例 更多代码示例请下载Flink State体系剖析以及案例实践 ListState 代码如下 import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** 需求当接收到的相同 key 的元素个数等于 3个就计算这些元素的 value 的平均值。* 计算keyed stream中每3个元素的 value 的平均值*/ public class TestKeyedStateMain {public static void main(String[] args) throws Exception{//获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度env.setParallelism(12);//获取数据源DataStreamSourceTuple2Long, Long dataStreamSource env.fromElements(Tuple2.of(1L, 3L),Tuple2.of(1L, 7L),Tuple2.of(2L, 4L),Tuple2.of(1L, 5L),Tuple2.of(2L, 2L),Tuple2.of(2L, 6L));/*** 1L, 3L* 1L, 7L* 1L, 5L** 1L,5.0 double** 2L, 4L* 2L, 2L* 2L, 6L** 2L,4.0 double***/// 输出//(1,5.0)//(2,4.0)dataStreamSource.keyBy(tuple - tuple.f0) //分组.flatMap(new CountAverageWithListState()).print();env.execute(TestStatefulApi);} }import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; import org.apache.flink.util.Collector;import java.util.Collections; import java.util.List;/*** ListStateT 这个状态为每一个 key 保存集合的值* get() 获取状态值* add() / addAll() 更新状态值将数据放到状态中* clear() 清除状态*/ public class CountAverageWithListStateextends RichFlatMapFunctionTuple2Long, Long, Tuple2Long, Double {// managed keyed state/*** ValueState : 里面只能存一条元素* ListState 里面可以存很多数据*/private ListStateTuple2Long, Long elementsByKey;Overridepublic void open(Configuration parameters) throws Exception {// 注册状态ListStateDescriptorTuple2Long, Long descriptor new ListStateDescriptorTuple2Long, Long(average, // 状态的名字Types.TUPLE(Types.LONG, Types.LONG)); // 状态存储的数据类型elementsByKey getRuntimeContext().getListState(descriptor);}Overridepublic void flatMap(Tuple2Long, Long element,CollectorTuple2Long, Double out) throws Exception {// 拿到当前的 key 的状态值IterableTuple2Long, Long currentState elementsByKey.get();// 如果状态值还没有初始化则初始化if (currentState null) {elementsByKey.addAll(Collections.emptyList());}// 更新状态elementsByKey.add(element);// 判断如果当前的 key 出现了 3 次则需要计算平均值并且输出ListTuple2Long, Long allElements Lists.newArrayList(elementsByKey.get());if (allElements.size() 3) {long count 0;long sum 0;for (Tuple2Long, Long ele : allElements) {count;sum ele.f1;}double avg (double) sum / count;out.collect(Tuple2.of(element.f0, avg));// 清除状态elementsByKey.clear();}} }MapState import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; import org.apache.flink.util.Collector;import java.util.List; import java.util.UUID;/*** MapStateK, V 这个状态为每一个 key 保存一个 Map 集合* put() 将对应的 key 的键值对放到状态中* values() 拿到 MapState 中所有的 value* clear() 清除状态*/ public class CountAverageWithMapStateextends RichFlatMapFunctionTuple2Long, Long, Tuple2Long, Double {// managed keyed state//1. MapState key 是一个唯一的值value 是接收到的相同的 key 对应的 value 的值/*** MapState:* Map集合的特点相同key会覆盖数据。*/private MapStateString, Long mapState;Overridepublic void open(Configuration parameters) throws Exception {// 注册状态MapStateDescriptorString, Long descriptor new MapStateDescriptorString, Long(average, // 状态的名字String.class, Long.class); // 状态存储的数据类型mapState getRuntimeContext().getMapState(descriptor);}/**** param element* param out* throws Exception*/Overridepublic void flatMap(Tuple2Long, Long element,CollectorTuple2Long, Double out) throws Exception {mapState.put(UUID.randomUUID().toString(), element.f1); //list// 判断如果当前的 key 出现了 3 次则需要计算平均值并且输出ListLong allElements Lists.newArrayList(mapState.values());if (allElements.size() 3) {long count 0;long sum 0;for (Long ele : allElements) {count;sum ele;}double avg (double) sum / count;//out.collect(Tuple2.of(element.f0, avg));// 清除状态mapState.clear();}} }总结 是否存在当前处理的 keycurrent keyoperator state 是没有当前 key 的概念而 keyed state 的数值总是与一个 current key 对应。存储对象是否 on heap: 目前 operator state backend 仅有一种 on-heap 的实现而 keyed state backend 有 on-heap 和 off-heapRocksDB的多种实现。是否需要手动声明快照snapshot和恢复 (restore) 方法operator state 需要手动实现 snapshot 和 restore 方法而 keyed state 则由 backend 自行实现对用户透明。数据大小一般而言我们认为 operator state 的数据规模是比较小的认为 keyed state 规模是 相对比较大的。需要注意的是这是一个经验判断不是一个绝对的判断区分标准。 更多内容和代码示例请下载Flink State体系剖析以及案例实践
http://www.zqtcl.cn/news/325200/

相关文章:

  • 辽宁智能建站系统价格金融做市场广告挂哪些网站
  • 做外贸的有哪些网站互动平台游戏
  • 网站设计最好的公司idc网站模板源码下载
  • 网站建设历史视频制作软件有哪些
  • 加盟网站制作定制桥的设计网站建设
  • 深圳做宣传网站的公司开发电商网站多少钱
  • 自适应网站建设公司什么是网站死链
  • 自己给网站做支付接口wordpress elementor
  • 中国最新军事新闻网站优化推广
  • 有没有做3d衣服模型网站php网站开发目的
  • 东莞网站建设方案咨询wordpress易企秀
  • 漳诈网站建设免费的企业网站建设
  • 广州番禺区有什么好玩的地方优化软件有哪些
  • 面包机做面包网站wordpress获取用户注册时间
  • 福州网站建设个人兼职泰州seo排名扣费
  • 泰安北京网站建设公司个人自我介绍网页
  • 网站建设适应全屏如何自动深圳市哪里最繁华
  • 杭州网站推广公司阿里云wordpress 安装目录
  • 厦门优秀网站建设app项目开发流程
  • 工作设计室网站海外网站代理
  • 室内设计官方网站没网站怎么做cpa
  • 哪个网站做欧洲旅游攻略好wordpress编辑器字体大小
  • aspcms 手机网站wordpress 刷浏览量
  • dw网站首页的导航怎么做网站建设企业建站模板
  • 平台型网站建设网站关键词优化seo
  • 齿轮机械东莞网站建设技术支持热搜词排行榜关键词
  • 河南专业做网站网站推广优化c重庆
  • 温州网站建设钱建设工程公司网站
  • 做笑话网站全国大学生职业生涯规划大赛官网
  • 便宜购 网站建设平台推广引流怎么做