当前位置: 首页 > news >正文

青岛市建设厅网站oppo应用商店官网

青岛市建设厅网站,oppo应用商店官网,北京最大的软件开发公司,移动互联网开发导语#xff1a;Flink是一个对有界和无界数据流进行状态计算的分布式处理引擎和框架#xff0c;主要用来处理流式数据。它既可以处理有界的批量数据集#xff0c;也可以处理无界的实时流数据#xff0c;为批处理和流处理提供了统一编程模型。 维度表可以看作是用户来分析数…导语Flink是一个对有界和无界数据流进行状态计算的分布式处理引擎和框架主要用来处理流式数据。它既可以处理有界的批量数据集也可以处理无界的实时流数据为批处理和流处理提供了统一编程模型。 维度表可以看作是用户来分析数据的窗口它区别于事实表业务真实发生的数据通常用来表示业务属性以便为分析者提供有用的信息。在实际场景中由于数据是实时变化的因此需要通过将维度表进行关联来保证业务的时效性和稳定性。本文主要围绕Flink维度表关联方案进行论述分析不同关联方案的作用和特点与各位读者共飨。 维度表与事实表的关联是数据分析中常见的一种分析方式在传统数仓系统中由于数据是有界的因此关联实现相对简单。但是在实时系统或实时数仓中数据是无界的关联时需要考虑的问题就会复杂很多如数据迟到导致的关联结果不准确、缓存数据消耗资源过大等等。 在典型的实时系统中维表数据一般来源于源系统的OLTP数据库中采用CDC技术将维表数据实时采集到Kafka或其他消息队列最后保存到HBase、Hudi、Redis等组件中供数据分析使用。一个比较常见的架构图如下 Flink维度表关联有多种方案包括实时lookup数据库关联、预加载维表关联、广播维度表、Temporal Table Function Join等。每种方案都有各自的特点需要结合实际情况综合判断维表关联方案主要考虑的因素有如下几个方面 ■ 实现复杂度实现维表关联复杂度越低越好 ■ 数据库负载随着事实表数据量增大数据库吞吐量能否满足数据库负载能否支撑 ■ 维表更新实时性维表更新后新的数据能否及时被应用到 ■ 内存消耗是否占用太多内存 ■ 横向扩展随着数据量增大能否横向扩展 ■ 结果确定性结果的正确性是否能够保证 01 实时lookup数据库关联 所谓实时lookup数据库关联就是在用户自定义函数中通过关联字段直接访问数据库实现关联的方式。每条事实表数据都会根据关联键到存储维度表的数据库中查询一次。 实时lookup数据库关联的特点是实现简单但数据库压力较大无法支撑大数据量的维度数据查询并且在查询时只能根据当时的维度表数据查询如果事实表数据重放或延迟查询结果的正确性无法得到保证且多次查询结果可能不一致。 实时lookup数据库关联还可以再细分为三种方式同步lookup数据库关联、异步lookup数据库关联和带缓存的数据库lookup关联。 1.1 同步lookup数据库关联 同步实时数据库lookup关联实现最简单只需要在一个RichMapFunction或者RichFlat-MapFunction中访问数据库处理好关联逻辑后将结果数据输出即可。上游每输入一条数据就会前往外部表中查询一次等待返回后输出关联结果。 同步lookup数据库关联的参考代码如下 创建类并继承RichMapFunction抽象类。 public class HBaseMapJoinFun extends RichMapFunctionTuple2String,String,Tuple3String,String,String { 在open方法中实现连接数据库该数据库存储了维度表信息。 public void open(Configuration parameters) throws Exception {org.apache.hadoop.conf.Configuration hconf HBaseConfiguration.create();InputStream hbaseConf DimSource.class.getClassLoader().getResourceAsStream(hbase-site.xml);InputStream hdfsConf DimSource.class.getClassLoader().getResourceAsStream(hdfs-site.xml);InputStream coreConf DimSource.class.getClassLoader().getResourceAsStream(core-site.xml);hconf.addResource(hdfsConf);hconf.addResource(hbaseConf);hconf.addResource(coreConf);if (User.isHBaseSecurityEnabled(hconf)){String userName dl_rt;String keyTabFile /opt/kerberos/kerberos-keytab/keytab;LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userName, keyTabFile);}else {LOG.error(conf load error!);}connection ConnectionFactory.createConnection(hconf); } 在map方法中实现关联操作并返回结果。 Override public Tuple3String, String, String map(Tuple2String, String stringStringTuple2) throws Exception LOG.info(Search hbase data by key .);String row_key stringStringTuple2.f1;String p_name stringStringTuple2.f0;byte[] familyName Bytes.toBytes(cf);byte[] qualifier Bytes.toBytes(city_name);byte[] rowKey Bytes.toBytes(row_key);table connection.getTable(TableName.valueOf(table_name));Get get new Get(rowKey);get.addColumn(familyName,qualifier);Result result table.get(get);for (Cell cell : result.rawCells()){LOG.info({}:{}:{},Bytes.toString(CellUtil.cloneRow(cell)),Bytes.toString(CellUtil.cloneFamily(cell)),Bytes.toString(CellUtil.cloneQualifier(cell)),Bytes.toString(CellUtil.cloneValue(cell)));}String cityName Bytes.toString(result.getValue(Bytes.toBytes(cf),Bytes.toBytes(city_name)));return new Tuple3String, String, String(row_key,p_name,cityName); } 在主类中调用。 //关联维度表 SingleOutputStreamOperatorTuple3String,String,String resultStream dataSource.map(new HBaseMapJoinFun()); resultStream.print().setParallelism(1); 1.2 异步lookup数据库关联 异步实时数据库lookup关联需要借助AsyncIO来异步访问维表数据。AsyncIO可以充分利用数据库提供的异步Client库并发处理lookup请求提高Task并行实例的吞吐量。 相较于同步lookup异步方式可大大提高数据库查询的吞吐量但相应的也会加大数据库的负载并且由于查询只能查当前时间点的维度数据因此可能造成数据查询结果的不准确。 AsyncIO提供lookup结果的有序和无序输出由用户自己选择是否保证event的顺序。 示例代码参考如下 创建Join类并继承RichAsyncFunction抽象类。 public class HBaseAyncJoinFun extends RichAsyncFunctionTuple2String,String, Tuple3String,String,String { 在open方法中实现连接数据库存储了维度表的信息。 public void open(Configuration parameters) throws Exception {org.apache.hadoop.conf.Configuration hconf HBaseConfiguration.create();InputStream hbaseConf DimSource.class.getClassLoader().getResourceAsStream(hbase-site.xml);InputStream hdfsConf DimSource.class.getClassLoader().getResourceAsStream(hdfs-site.xml);InputStream coreConf DimSource.class.getClassLoader().getResourceAsStream(core-site.xml);hconf.addResource(hdfsConf);hconf.addResource(hbaseConf);hconf.addResource(coreConf);if (User.isHBaseSecurityEnabled(hconf)){String userName dl_rt;String keyTabFile /opt/kerberos/kerberos-keytab/keytab;LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userName, keyTabFile);}else {LOG.error(conf load error!);}final ExecutorService threadPool Executors.newFixedThreadPool(2,new ExecutorThreadFactory(hbase-aysnc-lookup-worker, Threads.LOGGING_EXCEPTION_HANDLER));try{connection ConnectionFactory.createAsyncConnection(hconf).get();tableconnection.getTable(TableName.valueOf(table_name),threadPool);}catch (InterruptedException | ExecutionException e){LOG.error(Exception while creating connection to HBase.,e);throw new RuntimeException(Cannot create connection to HBase.,e);} 在AsyncInvoke方法中实现异步关联并返回结果。 Override public void asyncInvoke(Tuple2String, String input, ResultFutureTuple3String, String, String resultFuture) throws Exception {LOG.info(Search hbase data by key .);String row_key input.f1;String p_name input.f0;byte[] familyName Bytes.toBytes(cf);byte[] qualifier Bytes.toBytes(city_name);byte[] rowKey Bytes.toBytes(row_key);Get get new Get(rowKey);get.addColumn(familyName,qualifier);CompletableFutureResult responseFuture table.get(get);responseFuture.whenCompleteAsync((result, throwable) - {if (throwable ! null){if (throwable instanceof TableNotFoundException){LOG.error(Table {} not found, table_name,throwable);resultFuture.completeExceptionally(new RuntimeException(HBase table table_name not found.,throwable));}else {LOG.error(String.format(HBase asyncLookup error,retry times %d,1),throwable);responseFuture.completeExceptionally(throwable);}}else{List list new ArrayListTuple3String, String, String();if (result.isEmpty()){String cityName;list.add(new Tuple3String,String,String(row_key,p_name,cityName));resultFuture.complete(list);}else{String cityName Bytes.toString(result.getValue(Bytes.toBytes(cf),Bytes.toBytes(city_name)));list.add(new Tuple3String,String,String(row_key,p_name,cityName));resultFuture.complete(list);}}});} 在主方法中调用。 //异步关联维度表 DataStreamTuple3String,String,String unorderedResult AsyncDataStream.unorderedWait(dataSource, new HBaseAyncJoinFun(),5000L, TimeUnit.MILLISECONDS,2).setParallelism(2); unorderedResult.print(); 此处使用unorderedWait方式允许返回结果存在乱序。 1.3 带缓存的数据库lookup关联 带缓存的数据库lookup关联是对上述两种方式的优化通过增加缓存机制来降低查询数据库的请求数量而且缓存不需要通过 Checkpoint 机制持久化可以采用本地缓存例如Guava Cache可以比较轻松的实现。 此种方式的问题在于缓存的数据无法及时更新可能会造成关联数据不正确的问题。 02 预加载维表关联 预加载维表关联是在作业启动时就把维表全部加载到内存中因此此种方式只适用于维度表数据量不大的场景。相较于lookup方式预加载维表可以获得更好的性能。 预加载维表关联还可以再细分为四种方式启动时预加载维表、启动时预加载分区维表、启动时预加载维表并定时刷新和启动时预加载维表并实时lookup数据库。 预加载维表的各种细分方案可根据实际应用场景进行结合应用以此来满足不同的场景需求。 2.1 启动时预加载维表 启动时预加载维表实现比较简单作业初始化时在用户函数的open方法中读取数据库的维表数据放到内存中且缓存的维表数据不作为State每次重启时open方法都被再次执行从而加载新的维表数据。 此方法需要占用内存来存储维度表数据不支持大数据量的维度表且维度表加载入内存后不能实时更新因此只适用于对维度表更新要求不高且数据量小的场景。 2.2 启动时预加载分区维表 对于维表比较大的情况可以在启动预加载维表基础之上增加分区功能。简单来说就是将数据流按字段进行分区然后每个Subtask只需要加在对应分区范围的维表数据。此种方式一定要自定义分区不要用KeyBy。 2.3 启动时预加载维表并定时刷新 预加载维度数据只有在Job启动时才会加载维度表数据这会导致维度数据变更无法被识别在open方法中初始化一个额外的线程来定时更新内存中的维度表数据可以一定程度上缓解维度表更新问题但无法彻底解决。 示例代码参考如下 public class ProLoadDimMap extends RichMapFunctionTuple2String,Integer,Tuple2String,String {private static final Logger LOG LoggerFactory.getLogger(ProLoadDimMap.class.getName());ScheduledExecutorService executor null;private MapString,String cache;Overridepublic void open(Configuration parameters) throws Exception {executor.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {load();} catch (Exception e) {e.printStackTrace();}}},5,5, TimeUnit.MINUTES);//每隔 5 分钟拉取一次维表数据}Overridepublic void close() throws Exception {}Overridepublic Tuple2String, String map(Tuple2String, Integer stringIntegerTuple2) throws Exception {String username stringIntegerTuple2.f0;Integer city_id stringIntegerTuple2.f1;String cityName cache.get(city_id.toString());return new Tuple2String,String(username,cityName);}public void load() throws Exception {Class.forName(com.mysql.jdbc.Driver);Connection con DriverManager.getConnection(jdbc:mysql://172.XX.XX.XX:XX06/yumd?useSSLfalsecharacterEncodingUTF-8, root, Root123);PreparedStatement statement con.prepareStatement(select city_id,city_name from city_dim;);ResultSet rs statement.executeQuery();//全量更新维度数据到内存while (rs.next()) {String cityId rs.getString(city_id);String cityName rs.getString(city_name);cache.put(cityId, cityName);}con.close();} } 2.4 启动时预加载维表并实时lookup数据库 此种方案就是将启动预加载维表和实时look两种方式混合使用将预加载的维表作为缓存给实时lookup使用未命中则到数据库里查找。该方案可解决关联不上的问题。 03 广播维度表 广播维度表方案是将维度表数据用流的方式接入Flink Job 程序并将维度表数据进行广播再与事件流数据进行关联此种方式可以及时获取维度表的数据变更但因数据保存在内存中因此支持的维度表数据量较小。 示例代码参考如下 首先将维度表进行广播。 //维度数据源 DataStreamTuple2Integer,String dimSource env.addSource(new DimSource1());// 生成MapStateDescriptor MapStateDescriptorInteger,String dimState new MapStateDescriptorInteger, String(dimState,BasicTypeInfo.INT_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO); BroadcastStreamTuple2Integer,String broadcastStream dimSource.broadcast(dimState); 实现BroadcastProcessFunction类的processElement方法处理事实流与广播流的关联并返回关联结果。 SingleOutputStreamOperatorString output dataSource.connect(broadcastStream).process(new BroadcastProcessFunctionTuple2String, Integer, Tuple2Integer, String, String() {Overridepublic void processElement(Tuple2String, Integer input, ReadOnlyContext readOnlyContext, CollectorString collector) throws Exception {ReadOnlyBroadcastStateInteger,String state readOnlyContext.getBroadcastState(dimState);String name input.f0;Integer city_id input.f1;String city_nameNULL;if (state.contains(city_id)){city_namestate.get(city_id);collector.collect(result is : name ,city_id ,city_name);}} 实现BroadcastProcessFunction类的processBroadcastElement方法处理广播流数据将新的维度表数据进行广播。 Override public void processBroadcastElement(Tuple2Integer, String input, Context context, CollectorString collector) throws Exception {LOG.info(收到广播数据input);context.getBroadcastState(dimState).put(input.f0,input.f1); } 04 Temporal Table Function Join Temporal Table Function Join仅支持在Flink SQL API中使用需要将维度表数据作为流的方式传入Flink Job。该种方案可支持大数据量的维度表且维度表更新及时关联数据准确性更高缺点是会占用状态后端和内存的资源同时自行实现的代码复杂度过高。 Temporal Table是持续变化表上某一时刻的视图Temporal Table Function是一个表函数传递一个时间参数返回Temporal Table这一指定时刻的视图。可以将维度数据流映射为Temporal Table主流与这个Temporal Table进行关联可以关联到某一个版本历史上某一个时刻的维度数据。 示例代码参考如下 public class TemporalFunTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);EnvironmentSettings bsSettings EnvironmentSettings.newInstance().inStreamingMode().build();StreamTableEnvironment tableEnv StreamTableEnvironment.create(env, bsSettings);env.setParallelism(1);//定义主流DataStreamTuple3String,Integer,Long dataSource env.addSource(new EventSource2()).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorTuple3String,Integer,Long(Time.seconds(0)){Overridepublic long extractTimestamp(Tuple3String, Integer, Long stringIntegerLongTuple3) {return stringIntegerLongTuple3.f2;}});//定义维度流DataStreamTuple3Integer, String, Long cityStream env.addSource(new DimSource()).assignTimestampsAndWatermarks(//指定水位线、时间戳new BoundedOutOfOrdernessTimestampExtractorTuple3Integer, String, Long(Time.seconds(0)) {Overridepublic long extractTimestamp(Tuple3Integer, String, Long element) {return element.f2;}});//主流用户流, 格式为user_name、city_id、tsTable userTable tableEnv.fromDataStream(dataSource,user_name,city_id,ts.rowtime);//定义城市维度流,格式为city_id、city_name、tsTable cityTable tableEnv.fromDataStream(cityStream,city_id,city_name,ts.rowtime);tableEnv.createTemporaryView(userTable, userTable);tableEnv.createTemporaryView(cityTable, cityTable);//定义一个TemporalTableFunctionTemporalTableFunction dimCity cityTable.createTemporalTableFunction(ts, city_id);//注册表函数tableEnv.registerFunction(dimCity, dimCity);Table u tableEnv.sqlQuery(select * from userTable);u.printSchema();tableEnv.toAppendStream(u, Row.class).print(user streaming receive : );Table c tableEnv.sqlQuery(select * from cityTable);c.printSchema();tableEnv.toAppendStream(c, Row.class).print(city streaming receive : );//关联查询Table result tableEnv.sqlQuery(select u.user_name,u.city_id,d.city_name,u.ts from userTable as u , Lateral table (dimCity(u.ts)) d where u.city_idd.city_id);//打印输出DataStream resultDs tableEnv.toAppendStream(result, Row.class);resultDs.print(\t\t join result out:);env.execute(joinDemo);} } 最后总结各种维度表关联方案的特点如下
http://www.zqtcl.cn/news/603923/

相关文章:

  • 赣州本地网站网站怎么写
  • 物业公司网站设计湛江做网站软件
  • 做招聘求职网站wordpress启用插件出错
  • 珠海网站运营网站个人备案流程
  • 网站开发用什么图片格式最好网络营销名词解释是什么
  • 做柜子网站老电脑做网站服务器
  • 域名购买网站网店装修是什么
  • wordpress 网站备份为什么企业要建设自己的企业文化
  • 想做一个部门的网站怎么做潍坊网站建设价
  • 网站建设公司的公司哪家好什么行业必须做网站
  • 电子商务网站前台设计wordpress 上传文件大小
  • 深圳市住房和城乡建设局网站非常好的资讯网站设计
  • 长春作网站建设的公司国家建设环保局网站
  • 网站开发的有哪些好的软件wordpress菜单栏的函数调用
  • 家庭清洁东莞网站建设技术支持建筑模板厂投资多少钱
  • 郑州企业建站详情网站开发和网页开发有什么区别
  • 山西古建筑网站个人网站可以做自媒体吗
  • 腾讯云服务器可以做网站wordpress中文正式版
  • 做相亲网站赚钱吗vultr部署wordpress
  • 网站被挂马原因做网站较好的框架
  • 网站开发毕业设计参考文献自考大型网站开发工具
  • p2p网站建设方案策划书黄山旅游攻略冬季
  • 最世网络建设网站可以吗小说网站制作开源
  • 广州网站建设知名 乐云践新网页界面制作
  • 沈阳网站哪家公司做的好招标信息发布
  • 兰州企业网站h5页面用什么软件
  • 东莞自助建站软件ppt怎么做 pc下载网站
  • 兴化网站建设价格怎样用自己的电脑,做网站
  • 东莞网站建设企慕网站名称 注册
  • 佛山网站建设服务商百度推广客户端手机版下载