当前位置: 首页 > news >正文

专业网站设计制作价格网站建设 域名 空间

专业网站设计制作价格,网站建设 域名 空间,网站后台管理系统怎么做的,网站建设投标书服务方案范本文章目录 前言BroadcastStream代码示例Broadcast 使用注意事项 前言 Flink中的广播流#xff08;BroadcastStream#xff09;是一种特殊的流处理方式#xff0c;它允许将一个流#xff08;通常是一个较小的流#xff09;广播到所有的并行任务中#xff0c;从而实现在不同… 文章目录 前言BroadcastStream代码示例Broadcast 使用注意事项 前言 Flink中的广播流BroadcastStream是一种特殊的流处理方式它允许将一个流通常是一个较小的流广播到所有的并行任务中从而实现在不同任务间共享数据的目的。广播流在处理配置信息、小数据集或者全局变量等场景下特别有用因为这些数据需要在所有任务中保持一致且实时更新。 广播流的使用通常涉及以下步骤 定义MapStateDescriptor首先需要定义一个MapStateDescriptor来描述要广播的数据的格式。这个描述器指定了数据的键值对类型。 创建广播流然后需要将一个普通的流转换为广播流。这通常通过调用流的broadcast()方法实现并将MapStateDescriptor作为参数传入。 连接广播流与非广播流一旦有了广播流就可以将其与一个或多个非广播流无论是Keyed流还是Non-Keyed流连接起来。这通过调用非广播流的connect()方法完成并将广播流作为参数传入。连接后的流是一个BroadcastConnectedStream它提供了process()方法用于处理数据。 处理数据在process()方法中可以编写逻辑来处理非广播流和广播流的数据。根据非广播流的类型Keyed或Non-Keyed需要传入相应的KeyedBroadcastProcessFunction或BroadcastProcessFunction类型的处理函数。 广播流的一个典型使用场景是在处理数据时需要实时动态改变配置。例如当需要从MySQL数据库中实时查询和更新某些关键字过滤规则时如果直接在计算函数中进行查询可能会阻塞整个计算过程甚至导致任务停止。通过使用广播流可以将这些配置信息广播到所有相关任务的实例中然后在计算过程中直接使用这些配置信息从而提高计算效率和实时性。 总的来说Flink的广播流提供了一种有效的方式来实现不同任务间的数据共享和实时更新适用于各种需要全局数据或配置的场景。 BroadcastStream代码示例 功能将用户信息进行广播从Kafka中读取用户访问记录判断访问用户是否存在 import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties;import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; import org.apache.flink.util.Collector;import flink.demo.data.UserVo; /*** 多流connect,并进行join**/ public class BroadcastTest{public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();Properties proterties new Properties();proterties.setProperty(bootstrap.servers, 10.168.88.88:9092);proterties.setProperty(group.id, test);proterties.setProperty(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);proterties.setProperty(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer); // proterties.setProperty(auto.offset.reset, latest);FlinkKafkaConsumerObjectNode consumerVisit new FlinkKafkaConsumer(test,new JSONKeyValueDeserializationSchema(false), proterties);DataStreamSourceObjectNode streamSource env.addSource(consumerVisit);DataStreamSourceTuple2String, ListUserVo userStreamSource env.addSource(new UserListSource());MapStateDescriptorString, ListUserVo descriptor new MapStateDescriptor(userStream,BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHintListUserVo() {}));BroadcastStreamTuple2String, ListUserVo broadcastStream userStreamSource.broadcast(descriptor);// 将数据流和控制流进行连接利用控制流中的数据来控制字符串的输出BroadcastConnectedStreamObjectNode, Tuple2String, ListUserVo tmpstreamSource.connect(broadcastStream);tmp.process(new UserPvProcessor()).print();env.execute(kafkaTest);}private static class UserPvProcessorextends BroadcastProcessFunctionObjectNode, Tuple2String, ListUserVo, String {private static final long serialVersionUID 1L;MapStateDescriptorString, ListUserVo descriptor new MapStateDescriptor(userStream,BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHintListUserVo() {}));Override//用户信息处理public void processBroadcastElement(Tuple2String, ListUserVo value, Context ctx, CollectorString out)throws Exception {// 将接收到的控制数据放到 broadcast state 中 ctx.getBroadcastState(descriptor).put(value.f0, value.f1);// 打印控制信息System.out.println(Thread.currentThread().getName() 接收到用户信息 value.f0 value.f1);}Override//数据流public void processElement(ObjectNode element, ReadOnlyContext ctx, CollectorString out) throws Exception {// 从 broadcast state 中拿到用户列表信息ListUserVo userList ctx.getBroadcastState(descriptor).get(userList);String timeLocalDateTime.now().format(DateTimeFormatter.ofPattern(HH:mm:ss));if(userList!nulluserList.size()0) {MapString,String userMapnew HashMap();for(UserVo vo:userList) {userMap.put(vo.getUserid(), vo.getUserName());} // System.out.println(userMap);JsonNode value element.get(value);String useridvalue.get(user).asText();String userNameuserMap.get(userid);if (StringUtils.isNotBlank(userName)) {out.collect(Thread.currentThread().getName()存在用户userid userName time);}else {out.collect(Thread.currentThread().getName()不存在用户userid time );}}else {out.collect(Thread.currentThread().getName()不存在用户element.get(value) time );}}} }Broadcast 使用注意事项 同一个 operator 的各个 task 之间没有通信广播流侧processBroadcastElement可以能修改 broadcast state而数据流侧processElement只能读 broadcast state.需要保证所有 Operator task 对 broadcast state 的修改逻辑是相同的否则会导致非预期的结果Operator tasks 之间收到的广播流元素的顺序可能不同虽然所有元素最终都会下发给下游tasks但是元素到达的顺序可能不同所以更新state时不能依赖元素到达的顺序每个 task 对各自的 Broadcast state 都会做快照防止热点问题目前不支持 RocksDB 保存 Broadcast stateBroadcast state 目前只保存在内存中需要为其预留合适的内存
http://www.zqtcl.cn/news/164119/

相关文章:

  • 提供企业网站建设价格10元一年的虚拟主机
  • 塔城建设局网站电子商务网站建设方案目录
  • 网站容易被百度收录个人建购物网站怎么备案
  • 中文网站什么意思wordpress电脑访问不了
  • 杨家坪网站建设企业生产erp软件公司
  • 网站模块设计软件河北seo优化_网络建设营销_网站推广服务 - 河北邢台seo
  • 陕西正天建设有限公司网站西安专业网页制作
  • 网站建设工作室介绍范文seo网站排名的软件
  • 上海网站建设-网建知识可编辑个人简历模板
  • 北京新鸿儒做的网站shopify做国内网站
  • 网站怎样做百度推广机关门户网站建设要求
  • 好看的网站后台模板沧州网站群
  • 深圳做网站排名公司哪家好哪些网站seo做的好
  • 国内网站建设推荐网站建设合同标准版
  • 哈尔滨网站制作费用企业成品网站模板
  • 网络广告网站怎么做北京海淀建设中路哪打疫苗
  • 房地产公司网站制作电影发布网站模板
  • 如何利用开源代码做网站网站本科
  • 公司是做小程序还是做网站宜宾住房与城乡建设部网站
  • 做网站哪个公司最社区问答网站开发
  • 网站引量方法网站建设推广页
  • 书店网站的建设网络营销方法有哪些
  • 深圳网站优化软件顺企网怎么样
  • 做网站的需要什么要求中国五百强企业排名表
  • 网络营销 企业网站外贸响应式网站建设
  • 网站网页制作公司o2o平台是什么意思啊
  • 惠州市网站建设个人网站怎么进入后台维护
  • 微信网站链接怎么做wordpress 绑定手机版
  • 网站建设的内容是什么在线阅读小说网站怎么建设
  • 福州网站开发哪家比较好建设网站需要掌握什么编程语言