清苑区建设局网站,正规网站建设制作,服务佳的网站建设,宜家家居官网网上商城app1. 系统架构 2. 介绍流程
公司的困难数据的来源 业务日志 Flume采集日志数据 选型 ETL flume内存不够#xff0c;通过ganglia监控器发现 提高吞吐量#xff0c;batchSize kafka 高效读写 提高吞吐量 kafka挂了 kafka丢数问题 数据重复问题 数据乱序问题 消费策略…1. 系统架构 2. 介绍流程
公司的困难数据的来源 业务日志 Flume采集日志数据 选型 ETL flume内存不够通过ganglia监控器发现 提高吞吐量batchSize kafka 高效读写 提高吞吐量 kafka挂了 kafka丢数问题 数据重复问题 数据乱序问题 消费策略 zookeeper CAP只满足CP 非第一次选举 Flume发到hadoop source、channel、sink选择 小文件问题 har归档hive合并 零点漂移问题(拦截器) 业务数据 全量、增量 datax全量同步 空值问题 Maxwell增量数据 优点 原理 数仓计算引擎选择 通过比较选hive on Spark hive 组成 元数据换成MySQL HQL转成MR 外部表、内部表 系统函数 自定义UDF hive优化 数据倾斜举例加购到支付的一个平均使用时长用累积型快照事实表解决 建模 数据调研java的表产品的指标通过拆分指标确定能不能做 明确数据域 构建业务总线矩阵 建模自下而上首先从ODS层然后是DIM层和DWD层分别介绍 ODS三件事 DWD三类事务型事实表、周期型快照事实表、累积型快照事实表 DIM拉链表 ETL 指标体系建设 自上而下先有ADS再有DWS ADS的一些指标 指标拆分 拆分完之后去找公共的业务过程统计周期统计粒度然后建DWS层宽表 DataX导入到MySQL superset可视化 DS调度 这就是整个离线项目的大致过程
3. 详细介绍
在上个公司老板遇到了一些困难他想详细的了解公司的一些运营情况比如说了解日活、新增、留存转化率这些信息但是公司数据量特别庞大直接用MySQL是统计不了的而且统计历史数据的时候更不行所以这时候他决定组建这个大数据团队首先是购买相关的一些服务器阿里云或者物理机都可以买完服务器之后开始进行一个项目调研在调研的时间了解了一下公司的数据来源一共有两种一个是Java后台对应的业务数据在MySQL里面存放的还有就是前端埋点产生的用户行为数据以文件形式存放在日志服务器中并且在接收数据的时候都采用了Nginx进行一个负载均衡。 用户行为数据是保存30天起到一个备份的作用防止后续过程中数仓当中任何位置出现异常都有最原始的一个数据的备份都知道在企业当中数据是最重要的相对来说磁盘比较廉价所以我们备份30天接下来就是考虑如何将数据传输到Hadoop当中去进行后续的一个处理 在这里面我调研过很多技术我们也可以直接写一个java程序读取这个文件上传我们也可以写一个JAVA程序创建一个Hadoop客户端去进行put一次性put上来也行但是我们考虑了一下感觉这个直接put上来不怎么好因为这个log文件越来越大而且每天几百Gput上去的时间会比较长比较慢最终决定通过Flume来实时的采集数据你来一点我就采一点这样就省去一次性同步一个大文件的过程并且可以大大提高效率并且这个flume是专门为采集日志而生的就是用来读取文件然后进行一个传输他就是干这个事儿了。 并且Flume采集这个文件正常情况下可以直接就可以写到Hadoop当中但是我们又进行了一个架构上的思考因为我们需求当中需要我们的离线出仓还要兼顾未来实时数仓的一个架构也就说离线数据和实时数据共用一套采集系统那这时候我们就考虑说那能不能加上一个kafka那这样实时数据和离线数据都可从里面去取同时这个kafka也起到了一定的消峰作用比如说像双十一、618采集过来这个海量数据如果直接传到hadoop当中就会出现阻塞的情况但这时候呢如果写到这个kafka中然后慢慢的进行消费还有就是前面这个Flume这块涉及到source、channel、sink的选型问题这个Source呢我们选择的是Taildir Source因为它能支持断点续传和对应的多目录。并且这个Taildir Source的底层原理也非常简单啊比如说你这个重来数据它会put到这个Channel当中之后会记录一个offset那么下次读取的时间会先看一下offset读到哪里了然后再读它不会导致数据丢数但是在极端情况有可能产生重复数据比如说你数据发过来之后这个提交offset的时侯它失败了因为这块没有做事物保护那就有可能产生重复数据但是产生这个重复数据的概率是比较低并且即使产生了重复数据我们后续也可以在数仓的dwd层也可以对它进行一个去重那下一个呢是Channel因为我们下一级是kafka所以channel这块我们果断采用的Kafka Channel因为Kafka Channel的性能是高于memory Channel Kafka Sink的因为后者的传输通道没有Kafka channel的传输通道快所以说这里我们选择是Kafka channel直接把数据就传到Kafka中首先是他的速度比较快另一方面呢它的可靠性也比较高因为它的数据是直接存到Kafka中kafka底层就是基于磁盘的所以这个可靠性是有保障的所以这里面选择了Kafka Channel。 同时在这个flume当中我们还做了一个ETL拦截器这个ETL拦截器主要是判断JSON是否完整因为这里主要是想减少网络上的带宽比如说你这个JSON根本不完整的你直接传过来传到最终的Hadoop中那就占用网络带宽那就这种数据我能不能在源头上就把它干掉因为就算你存到Hadoop当中在使用时也会解析失败那为什么没有把这个JSON详细的解析出来一个一个字段去进行判断把ETL的操作都放在这儿呢那因为这个flume相当于下水道你放的头发丝比较多的话都容易堵塞所以这个会影响到flume的吞吐量降低它传输的一个性能所以这块只能做简单的清洗操作并且像这种自定义拦截器的步骤也比较简单我只需要定一个类实现一个intercept的接口再去重写里面四个方法自主化关闭单event多event然后再写一个静态内部类Builder。然后之后呢打包上传到flume的lib包下然后在配置文件当中全类名拼接上到 $ 这样就可以实现之后呢我们还用到这个ganglia监控器可以监控这个flume的put事务当然我用的kafka channel就没有这个take事务了这里面有个put尝试提交的次数和最终success成功的次数如果你发现这个put大量成功但是这个尝试提交次数远远大于最终success成功的次数就说明发生了大量的这个回滚操作那就说明flume运行不太良好那这时候一般情况下就是你的内存不够导致的啊因为flume的默认内存只有20M。此时你需要把他增加到这个4-6个G可以去flume的env.sh当中修改对应的参数就可以了并且在使用flume当中想提高它对应的一个吞吐量可以去修改它里面的参数叫batchSize把这个值调大就可以提高它对应吞吐量但是带来的问题就是导致这个数据时间会有一定的延迟如果能接受这种延迟那就没有问题但正常情况我们是可以接受的了的因为数据传到Hadoop也是在这等着还有就是凌晨的时候才开始处理这个数据所以慢慢传也没有问题。
接下来就是数据传输到kafka中当时为什么选择kafka呢主要是因为kafka能做到一个高效读写比如说它首先是集群同时呢它可以设置这个分区增加并行度并且它底层是采用这个稀疏索引index进行存储的每4KB的数据去记录一条索引那这样这个处理的速度要快一些。另一个就是底层采用的是顺序读写可以达到600 M/S速度非常快还有就是零拷贝和页缓存技术那什么是零拷贝和页缓存呢比如说这个生产者啊发送到kafka对应的这个数据那你发送过来的数据这个kafka不对它进行处理直接扔给页缓存。 谁是页缓存呢就是Linux系统的内存直接给它那给它之后呢什么时候落盘呢首先第一个它内存不够的时候那肯定会落盘还有一种呢就是它这里面的这个数据不被经常访问了也会去进行落盘。当外部来访问kafka数据的时候会直接从内存里面返回数据。 那万一落盘了这个拿不到数据怎么办落盘之后可以再加载回来再进行应答他的操作都是内存操作速度非常非常快那另一方面什么叫零拷贝呢消费者来消费数据正常情况下框架设计都会走这个应用层去页缓存里面读取数据读过来之后得通过网卡发送给消费者但是通过网卡发送的时候都有一个发送缓冲流这是网卡内存的一个备份这样的话就会多一次拷贝过程但是kafka设计的就非常精妙他根本就没有这个应用层或者是应用层不做任何操作就直接去访问页缓存那页缓存中的数据直接就通过网卡返回给消费者就省去了网卡内存的一个拷贝过程这就是零拷贝。 在使用kafka过程中遇到过一些问题比如说提高这个kafka的吞吐量怎么提高kafka吞吐量呢其实这里面涉及到的角色比较多因为它是由生产者、broker和消费者这样一个构成任何一个环节都有可能阻塞吞吐量的提高我们需要进行综合的考量比如说我可以对参数进行一些优化 第一双端队列的32 M缓存我给他提高到64 M16K的批次大小给他提高到32K还有这个linger.ms默认0ms我给提高到10ms当然可能会造成一定的延迟那还有就是采用压缩我们采用snappy进行压缩以减少网络IO另一方面在broker端我可以增加对应的分区数同时我要求消费者端要么增加消费者要么增加CPU核数否则的话没有用并且在这个消费数据的时候呢默认一次拉取的数据量是50 M我给他提高到60 M甚至更高拉过来之后把他放到这个队列里我一次处理的条数它默认是500条那我可以提高到1000到3000条这样就可以去提高kafka的吞吐量同时在使用kafka中也遇到了那个挂了、丢了、重了乱序了等等一系列问题那比如说首先这个挂了你要先看日志看是什么原因导致它挂了一般情况下呢有可能是你资源不够那资源不够的话你可以用linux的高级命令查看磁盘比如说df -h查看CPU可以用这个top查看这个内存可以用jmap -heap有的时候内存不够的可能性也比较多因为它默认内存是1 G生产环境中一般我们得调到10~15 G当然还有一种情况就是这个呃像我之前有个同事误删了一个kafka节点删掉之后他当时慌的一批然后我分析了一下没啥问题最后跟我的猜测是一样的因为我们副本是两个把这个你正常删掉之后重新去给他服役上来就没有啥问题也不会导致数据这个丢失这是挂了那如果是丢数据了呢如果你要想保证数据不丢那你就将acks设置为-1然后将这个副本数设置为大于等于2还有isr里面最小副本数也设置为大于等于2那这样就可以了啊就可以保证了这就可以把丢数据的问题解决了。
那如果有重复数据呢可以使用这个幂等性那幂等性的原理其实就是你整过来的数据他在内存当中维护了一组数据的这个元数据。他会判断你这个数据之前发没发过发过的话那我就不再往下发了没发的话我再重新落盘然后再记住这个元数据就是这样一个过程。但是呢他怎么判断这两个数据相不相同呢条件有三个第一个是PID第二个是分区号第三个是序列号那这里面PID要注意一下因为PID是kafka每次一重启的时候就会发生变化并且kafka的内存全部就清空了那以后也许你是重复的数据那我也会认为是不重复的就会在极端情况产生一些重复数据所以幂等的话也不能完全的保证数据不重只能说他可以保证的是单分区单会话内数据不重多会话是肯定不行的。还有就是ack没有应答成功的情况下 要想保证数据不重复就需要开启事务事务是靠的五个API来处理的当然它底层是基于这个幂等性的也就是说你使用事务的时候必须得开启幂等性那他5个API是哪5个呢首先是初始化开启事务然后是在事务内提交已经消费的偏移量API最后是提交事务或者是回滚事务这5个API就可以控制事务的一个原子性如果写入broker失败了我们可以进行回滚。
那还有就是乱序数据那什么是乱序呢比如说第一个先保证有序吧先说有序有序怎么保证是有序呢就是kafka可以保证单分区内是有序的你往一个分区里面发那数据肯定是有序的如果非得要求多分区数据有序那你就得把所有的分区数据通过下游处理器去读出来然后进行重新排序当然还有一些场景就是希望把一张表的所有数据发送到某一个分区那你就把这个表名设置为key就行了就可以实现将一张表发送到某一个分区里面去。但是单分区中有可能会有乱序那如何解决这个问题呢第一个就是没有开启幂等性的情况下发送失败时不进行重试不就可以了就是将retries设置为0同时将request的那个参数in.flight也设置为1就是一次只发送一条数据。因为这里面是涉及到一个broker 还有这个生产者生产者往broker发送数据的时候这里面有in.flight默认是5也就是说我可以源源不断的异步的往broker进行一个发送发送过来之后会判断这个数据的一个序列号是否是连续的你不是连续的那就不行不连续就不允许发送那不就行了那你就源源不断往这发就行了。第二个就是开启幂等性那这个幂等性它在broker端会自动帮我们对这个数据进行一个排序但最多一次排五个所以可以把in.flight设置为小于等于5把retries设置为默认值再加上这个幂等性基本上就可以了而且这个in.flight如果你将他设置为大于等于5它直接就给你报错了所以你也设置不成功。 kafka里面还有一个消费策略它包含了Range、RoundRobin、粘性这三种。 那这里面主要考虑一下Range、RoundRobinRange的特点就是它的这个分配算法特别快比如说你七个分区三个消费者他给你七除三等于二余一那就是第一个消费者给你三个然后之后两个两个的然后这样就完事了所以这种分配算法速度非常非常快好多这个企业愿意用但是呢它有个毛病如果你针对这个多个topic的这种情况它是每个topic都单独的进行这样的一个分配那就会导致多出来的分区都砸到第一个消费者最终它容易造成数据倾斜所以如果topic比较多那就不要用它了那就用RoundRobin他的的特点就比如说还是七个分区三个消费者它采用的是轮循一人一个轮着来但是这种方法相对来说是有点慢但好处就是不论有多少topic我都是轮循这个消费者与消费者之间最多就差一个分区所以就不会产生数据倾斜另外就是粘性正常分配的时候一般都不考虑它都是在这个消费者挂了的时候如果没有加上粘性那再平衡时每个分区和消费者之间都可能重新分配这个改动就比较大那反过来我加上粘性之后那只是把挂了的这个消费者的任务重新分配给其他消费者那这样的话分配的速度要更快一些影响范围也要更小一些所以通常就是这个粘性结合Range或者RoundRobin进行使用这个官网默认就是采用的这个Range 粘性这种方式进行使用的。 那kafka是依赖于zookeeper的就是这个zookeeper是非常不错的轻量级框架那在这个使用zookeeper的时候我也对它进行过详细的研究那比如说zookeeper里面它有一个CAP法则但是它只满足对应的CP法则C就是数据一致性P是这个分区容错性但是它不满足的是A就是可用性因为zookeeper在这个重新选举的时候是不可能对外提供服务的所以说就不能保证这个可用性。还有就是他在非第一次选举的时候epoch事务id服务器id它就遵循一个这样的选举规则当然了我也了解了一下这个kafka 2.8以后是可以不用这个zookeeper的并且我大胆的预言了一下等这个kafka再升级到稳定版并且稳定一点之后就可以考虑把zookeeper干掉省去一定的这个外部通讯的这个资源那么效率也会适当提高一些。 接着就是考虑怎么将这个Kafka的数据去传输到hadoop中其实这块呢有很多办法比如说我们可以写个java程序消费kafka然后上传到hadoop中也还可以写一个flink程序source源对接它然后这个Sink端对接Hadoop也没问题但是这都需要我们写代码相对就麻烦一点然后又考虑了一下能不能用其他组件因为我们前边是flume那我这边也可以用呀最后我们项目选用的是kafka source和hdfs Sink中间的channel因为传输的是日志丢一些条数也无所谓于是我们选用了Memory Channel。 但是我们用hdfs Sink的时候突然发现这个hdfs中产生了大量的小文件当时我同事就慌了这时我还是比较淡定的我说这个有问题是好事儿解决掉就可以了然后果断的查阅这个官方手册一看这里面确实有参数可以去控制可以控制生成文件的大小然后我把大小控制成128 M但是以我的经验来看只控制大小是肯定不行的万一这个过来的数据总量都到达不了128 M那怎么办我寻思着如果再有个时间配合他就完美了一看文档果然有这个时间参数又给他添加了一下但是我又翻了一下这个文档里面还有个设置event的个数它默认给你设置成10就是表示你过来10个event数据我给你生成一个文件但这样还是有产生小文件的这种风险那果断的设置为0给他禁止掉所以控制时间、大小、event个数这样在hdfs上就没有产生一些小的文件那之前产生的小文件怎么办也有办法可以采用har归档就是把这个大量的小文件归档在一起主要是减少NameNode的一个压力还有就是如果你是MR程序可以采用这个combinerTextinputFormat如果你用的是hive的话你可以采用这个combineHiveInputFormat都可以就是将这个大量的小文件放在一起统一进行一个切片那就减少了mapTask开启的个数也就减少了占用的内存并且同时也可以开启JVM重用来减少JVM开关的一个时间。
并且在使用这个flume过程中我们还解决了一个零点漂移的问题这是阿里当年的一个难题什么难题呢因为我这个数据存储是按天创建分区的比如说18号19号每天一个分区。然后在产生数据的时候比如说是这个18号23:59:59产生了一条日志但是由于整个传输通道肯定是有一定的延迟的那传到我下一级的时候已经变成了19号0000:50。他在存储时是用的系统的当前时间那当前时间就是19号那它就会把这个数据写到19号分区里面去但是这是18号产生的数据不能给我放到19号里那像阿里当年是把下一个分区的最近15分钟的数据都读出来然后通过时间过滤将18号的数据过滤出来再给它追加到18号分区这显然就很麻烦但是现在这个技术都在进步直接在这块放上一个时间戳拦截器我把你这条数据拦下来拦下来之后取出他的时间戳然后把它给hdfs sink的时间戳变量然后他就根据时间戳变量直接写到18号分区了这样就省去了后续再处理这个麻烦事解决了零点漂移的问题。 接下来就是对业务数据的处理。处理业务数据就分情况了里面涉及到一个同步策略同步策略有全量和增量全量就是把所有的数据一次性全拿过来尤其是数据量较小的表比如这个商品的SKU表商品的SPU表商品一级分类二级分类三级分类地区表类似这些表数据量都非常非常小直接就全量同步过来了但是还有一些大表。例如一些事实表像加购下单支付物流这些数量比较大如果每天都全量的话那压力太大不光同步的速度比较慢也很占用磁盘空间这时候就考虑说用增量
处理这个全量的数据的组件目前市场上比较主流的有sqoop和DataXDataX对接这个数据源比较多像OracleMySQLHDFS都可以很好的支持。而且我们的数据量也不大全量的数据加在一起每天才1~2 G并且这个DataX属于叫单节点基于内存的一个同步所以速度也是非常快可能十几分钟就完事了但是使用DataX的时候它是有点小bug的比如说mysql里面的空值就是空但是我们同步到hdfs中未来就是给hive使用的而hive的空是\N那明显这个需要转换一下但是这个免费版本没有修改当然我可以自己改源码或者不改的话只能在hive创建表时进行空值转换将其转换成空串也是可以的那对于这个DataX调优的话我们之前的那些数据量其实也涉及不到调优但是如果贵公司用的这个DataX导的这个数据量比较大那就可能需要去增加它的内存或者增加对应的线程数。
接下来就是处理增量数据。我们可以用Maxwell进行一个同步。为什么选择Maxwell这个同类型的产品有很多比如说Flink CDC 等原因很简单因为Maxwell支持断点续传和首日全量可以进行一个初始化那Flink CDC各方面和这个Maxwell一样但是比较遗憾的就是当时它是1.0版本有锁表的bug所以这个在生产环境当中没法正常使用到了应该是2022年的这个三月份的左右吧应该他推出的这个2.0解决了这个锁表bug但是当时我们只能上Maxwell了Maxwell的底层原理也比较简单就是MySQL的主从复制伪装成一个Mysql的从库获取这个BinLog的数据接着就把增量数据同步过来了增量数据同步的都是一些事实表。 接下来我们又去决定用什么计算引擎去进行一个数仓的搭建我们考虑了hive的 Mr Spark Sql还有hive on Spark最后我们选择了hive on Spark。因为这几个里面性能最好的就是Spark Sql但是Spark的生态不是特别完善比如说权限管理、元数据管理Spark都是不支持的所以我们只能用hive on Spark进行解决。 在使用hive过程中我们也对hive进行了研究比如说hive的组成首先是客户端然后是元数据它默认元数据存储在debery数据库不支持多人开发所以我们给它放到mySql里面同时还有对应的解析器数据生成器还有逻辑计划生成器逻辑计划优化器物理计划生成器物理计划优化器以及对应的执行器对这个Sql进行一个解析其实这些模块就是将你写的这个HQL翻译成可以执行的MR程序首先将这个HQL翻译成AST抽象语法树然后对这个抽象语法树进行逻辑计划的一个生成然后进行优化然后再通过物理计划生成器生成物理计划再进行优化最后一执行就完事了hive是基于hdfs它的底层存储是落在HDFS上并且默认的计算引擎是MR任务调度用的是Yarn的这一套这个MR引擎之后我们换成了Spark引擎进行计算。 还有就是我们在hive中创建表的时候通常都会创建外部表因为外部表在删除数据的时候只删除元数据不会删除原始数据起到一个安全作用那像我们只有自己使用的那种临时表才会创建内部表那个删了也没事还有像我们在公司中也大量使用系统函数我看了一下官网上有289个但是我们并没有用那么多但是也用了很多比如date_add、date_sub、 datedifflast_date、next_date、nvl、 case when、if等函数但虽然说有这么多函数但是在我们这个开发中还可能遇到一些比较复杂的场景就需要我们去自定义函数来进行实现比如说可以自定义UDF和UDTF当然UDAF比较少那像UDF就是相当于我们用的算子map那UDTF就相当于Flat map那UDF自定义的时候非常简单只需要定义一个类继承这个GenericUDF里面就一个核心方法叫evaluate。在里面写自己的逻辑就行了当然还有一些初始化关闭的一些东西那UDTF就是属于这个炸裂一般的就是定义一个类继承GenericUDTF去写三个方法初始化、关闭、processprocess里面是核心逻辑初始化里面去声明类型及对应名称就可以了接着就需要对这个自定义函数进行打包上传到HDFS路径然后在hive的客户端进行注册去使用还有一些窗口函数像over它里面既可以进行分区又可以排序去进行一个开窗处理用它的时候统计最多的就是像一些7天内连续3天、topN还有这个同时在线人数等一些场景。 还有就是在使用hive当中也对其进行过一些优化比如说如果是大、小表join的情况下这个mapJoin默认打开就不要关闭了还有我们可以提前进行行列过滤就类似于这个谓词下推也可以把谓词下推功能开启也可以直接创建分区表去防止后续的全表扫描也可以创建分桶表当然还可以采用压缩去减少网络上数据的传输去减少磁盘IO还可以采用列式存储来加快查询的速度可以处理小文件去提前开启merge功能就是在执行MR程序的时候产生了大量的小文件会单独再开启个MR将这些小文件进行合并合并的规则就是小于16 M的文件就会认为是小文件给它合并到256 M还有就是如果只有map任务时是默认打开如果是MR任务就需要手动把它打开还有像小文件这种的我们可以开启这个combineHiveInputFormat将小文件打包在一起统一进行切片同时也可以开启JVM重用减少一些JVM开关的时间。同时我们还可以合理的设置reduce的个数那还有就是如果遇到group by的这种场景需要提前去开启mapside进行预聚合还有会将MR引擎更换为Spark引擎来进行相关的一些优化。 我们有时还会遇到数据倾斜比如说我们之前在统计一个各个省份的交易额这种的指标在yarn上就看到了个别省份有的运行的快有的运行的慢像快的能比这个慢的时间上差了20倍像北京、广东、江苏的这些地方执行的特别慢那这时候果断的把这个任务停掉然后开启group by的mapside进行预聚合同时又开启了skewgroup by进行二次聚合还有除了group by这种数据倾斜的问题我们还遇到这种join产生的数据倾斜首先就是大、小表join的场景一般就是事实表和维度表join这种场景特别多比如订单跟商品那这种我一般就是先开启mapjoin进行处理就可以了最怕的就是这种大表和大表的join比如说之前我们就统计了一个加购到支付的一个平均使用时长这种情况下就遇到了这个大表join大表的场景。接着我们就去可以开启skewjoin也可以开启smb 来分桶join但是这个办法就是他要求两张表必须得是分桶且有序还有也可以用左表随机右表扩容来解决但是会增加额外的一些操作虽然说能够让每个reduce里面处理的数据量少但是实际的工作量反而是大了但是它能把这个任务执行完那最牛的解决方式就是从建模上对他进行解决比如说我可以采用这个维度建模当中的累积型快照事实表在这两张表没有变大的时候我就开始进行了处理累积的过程这样的来做就避免了大表join大表这种场景。
接着我们就开始进行正常的建模首先呢就是进行数据调研先把JAVA后台的所有的表的数据都拿过来详细去看看完之后对这个业务有大概的一些了解我是怎么看的呢一般就是我模拟我自己是一个用户然后去在页面上浏览他能干哪些事儿可以跟后台哪些表有这个联系了解差不多之后我就找这个JAVA后台人员进行一个沟通去聊一下我的猜测是否是对的跟他沟通差不多之后我就跟那个产品经理聊聊对应的需求。 需求里面有原子指标派生指标衍生指标像 派生指标 原子指标 统计周期 统计粒度 业务限定。原子指标 业务过程 度量值 聚合逻辑。按这样一个过程去拆这里面的业务逻辑那如果业务逻辑里面有这个业务那我就能做没有这个业务就做不了所以这个得跟他沟通清楚下面就开始这个第二步去明确数据域其实也是跟着需求来的需求当中有哪些域就去处理哪些域的指标那首先呢是用户来到网站进行一个登录注册那就有了用户域然后准备逛一逛就产生了流量那流量就是用户的这个启动、页面、动作、错误、曝光就是流量域逛差不多了就去买东西了那不久产生了交易那交易的话就得是加购、下单、支付、物流、退单等交易域就出来了在这个交易得过程当中又进行了优惠券得使用活动的参加可以把他归结为工具域最后呢收到货了用了后可以进行一个互动去点赞、评价、收藏互动域也出来了。
明确这些数据域之后就是去构建业务总线矩阵构建业务矩阵就是把事实表往左边一放上面放上维度表他们之间有关联的根据坐标一打勾就行了。 构建业务总线矩阵之后才开始真正的建模。建模的话是自下而上首先从ODS层然后是DIM层和DWD层
ODS层里面就干了三件事。第一件事保持数据原貌不做任何修改起到一个备份的作用去防止数仓上面任何一层发生变化都有最原始的数据第二件事就是创建分区表防止后续的全表扫描第三件事就是采用压缩减少磁盘的存储空间。
接下来就是这个DWD主要处理的就是事实表这里面就涉及到事实表的分类了主要有三类事务型事实表、周期型快照事实表、累积型快照事实表大多数情况下呢我们是优先考虑事务性事实表因为事务性事实表特点就是处理原子操作是不可再切割的比如说加购、下单、支付、物流这些但是他不能解决所有问题比如说这个连续型指标或者是这个多张事实表关联的场景他不擅长那这种连续型指标就得用周期型快照如果是多事实表关联我们就用累积型快照具体的事务型事实表处理有标准的四步第一步选择业务过程第二步声明粒度第三步确定维度第四步确定事实那选择业务过程就选择产品经理感兴趣的也就是指标当中需要统计的这个业务第二步声明粒度就是一行信息代表什么含义可以代表一次下单一个月下单或一年下单那如果给你的一年下单那你没法统计一次下单那只给你一次下单你就可以统计一年下单所以这里面要保持最细粒度只要保证不做聚合操作就行。第三步确定维度就是我们产品经理统计指标当中需要的比如说我们需要这个用户商品活动、时间、地区、优惠券这些那就留下不需要的就把它干掉第四步确定事实确定事实表的度量值什么叫度量值就是可以累加的值例如个数、件数、金额把它确定好那这些确定完之后再看周期型快照就是在这个声明粒度的时候变成这个一天或者是一年看你自己这个周期是多少自己确定那这个累积型快照事实表只不过是在确定事实的时候要确定多个事实表的度量值就完事了。
接着就是DIM层主要是维度相关的一些处理就是一般没有度量值只要一些属性信息这些通常都是维度那在这里面主要做的比如说对用户做的拉链表因为用户表的特点就是缓慢变化有的时候一天变化一次一月变化一次不确定而且数据量还比较大所以要做拉链表对它进行处理拉链表也比较简单只需要在用户表末尾加上开始日期、结束日期然后进行初始化接下来跟第二天获取的新增和变化的数据进行关联之后取出最新的数据放到新分区旧的数据放到旧分区并且在DIM层我们还进行了维度整合比如说我们将商品表商品品牌表商品一级分类二级分类三级分类整合成商品维度表将省份、地区整合成地区维度表将这个活动信息活动规则整合成活动维度表就完事了。
DWD和DIM层还做了一些ETL清洗比如说清洗我们用的是HQL在这里面判断一些核心字段不能为空、一些重复数据的处理、相关数据的脱敏等一系列操作。
这之后整个建模就结束了那接着我们就开始进行一个指标体系建设它是自上而下先有ADS再有DWS。ADS我们统计的指标日活、新增、留存率最近7日内连续3日下单用户数各品牌商品收藏次数Top3等等有了这些指标之后需要进行拆分把派生指标变换成原子指标 统计周期 统计粒度 业务限定拆分完之后去找公共的业务过程统计周期统计粒度然后建DWS层宽表去进行构建构建完成之后DWS建完那整个数仓也就建完了那建完之后就是要把数仓中处理好的数据往外部系统中导那这里就可以用DataX进行同步DataX这边还会产生空值问题如果说你这里面是\N再往MySql里面导DataX就有对应的参数。 最终我们用superset进行可视化虽然说它的页面比较丑但是免费这点老板比较喜欢后面调度时我们用的DS进行数仓的调度因为它一方面是国产的并且这团队也比较强大各方面性能也比较OK最后就选了他当然在DS里面每天跑的指标平时也就100多个节假日的情况下一般是150个到200个左右。 之后元数据管理这块都是我同事做的可以用atlist也可以自己实现自己去解析相关的这个程序。这个不是我主导的了解的不是太多。 这些整完之后呢就是整个集群的监控是PrometheusGrafana可以监控各个组件某一个进程挂了可以直接触发报警。
这就是整个离线项目的大致过程。