当前位置: 首页 > news >正文

万网速成网站凡科网

万网速成网站,凡科网,东莞疾控中心最新通知,重庆大型网站建设重庆网站制作Flink SQL 语法篇#xff08;四#xff09;#xff1a;Group 聚合、Over 聚合 1.Group 聚合1.1 基础概念1.2 窗口聚合和 Group 聚合1.3 SQL 语义1.4 Group 聚合支持 Grouping sets、Rollup、Cube 2.Over 聚合2.1 时间区间聚合2.2 行数聚合 1.Group 聚合 1.1 基础概念 Grou… Flink SQL 语法篇四Group 聚合、Over 聚合 1.Group 聚合1.1 基础概念1.2 窗口聚合和 Group 聚合1.3 SQL 语义1.4 Group 聚合支持 Grouping sets、Rollup、Cube 2.Over 聚合2.1 时间区间聚合2.2 行数聚合 1.Group 聚合 1.1 基础概念 Group 聚合定义支持 Batch / Streaming 任务Flink 也支持 Group 聚合。Group 聚合和上面介绍到的窗口聚合的不同之处就在于 Group 聚合是按照数据的类别进行分组比如年龄、性别是横向的而窗口聚合是在时间粒度上对数据进行分组是纵向的。如下图所示就展示出了其区别。其中 按颜色分 key横向就是 Group 聚合按窗口划分纵向就是 窗口聚合。 1.2 窗口聚合和 Group 聚合 应用场景一般用于对数据进行分组然后后续使用聚合函数进行 count、sum 等聚合操作。 那么这时候小伙伴萌就会问到我其实可以把窗口聚合的写法也转换为 Group 聚合只需要把 Group 聚合的 Group By key 换成时间就行那这两个聚合的区别到底在哪 首先来举一个例子看看怎么将 窗口聚合 转换为 Group 聚合。假如一个窗口聚合是按照 1 1 1 分钟的粒度进行聚合如下 滚动窗口 SQL -- 数据源表 CREATE TABLE source_table (-- 维度数据dim STRING,-- 用户 iduser_id BIGINT,-- 用户price BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL 5 SECOND ) WITH (connector datagen,rows-per-second 10,fields.dim.length 1,fields.user_id.min 1,fields.user_id.max 100000,fields.price.min 1,fields.price.max 100000 )-- 数据汇表 CREATE TABLE sink_table (dim STRING,pv BIGINT,sum_price BIGINT,max_price BIGINT,min_price BIGINT,uv BIGINT,window_start bigint ) WITH (connector print )-- 数据处理逻辑 insert into sink_table select dim,count(*) as pv,sum(price) as sum_price,max(price) as max_price,min(price) as min_price,-- 计算 uv 数count(distinct user_id) as uv,UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval 1 minute) AS STRING)) * 1000 as window_start from source_table group bydim,-- 按照 Flink SQL tumble 窗口写法划分窗口tumble(row_time, interval 1 minute)转换为 Group 聚合 的写法如下 -- 数据源表 CREATE TABLE source_table (-- 维度数据dim STRING,-- 用户 iduser_id BIGINT,-- 用户price BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL 5 SECOND ) WITH (connector datagen,rows-per-second 10,fields.dim.length 1,fields.user_id.min 1,fields.user_id.max 100000,fields.price.min 1,fields.price.max 100000 );-- 数据汇表 CREATE TABLE sink_table (dim STRING,pv BIGINT,sum_price BIGINT,max_price BIGINT,min_price BIGINT,uv BIGINT,window_start bigint ) WITH (connector print );-- 数据处理逻辑 insert into sink_table select dim,count(*) as pv,sum(price) as sum_price,max(price) as max_price,min(price) as min_price,-- 计算 uv 数count(distinct user_id) as uv,cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start from source_table group bydim,-- 将秒级别时间戳 / 60 转化为 1mincast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)确实没错上面这个转换是一点问题都没有的。 但是窗口聚合和 Group by 聚合的差异在于 本质区别窗口聚合是具有时间语义的其本质是想实现窗口结束输出结果之后后续有迟到的数据也不会对原有的结果发生更改了即输出结果值是定值不考虑 allowLateness。而 Group by 聚合是没有时间语义的不管数据迟到多长时间只要数据来了就把上一次的输出的结果数据撤回然后把计算好的新的结果数据发出。运行层面窗口聚合是和 时间 绑定的窗口聚合其中窗口的计算结果触发都是由 时间Watermark推动的。Group by 聚合完全由 数据 推动触发计算新来一条数据去根据这条数据进行计算出结果发出由此可见两者的实现方式也大为不同。 1.3 SQL 语义 SQL 语义这里也拿离线和实时做对比Order 为 Kafkatarget_table 为 Kafka这个 SQL 生成的实时任务在执行时会生成三个算子。 数据源算子From Order数据源算子一直运行实时的从 Order Kafka 中一条一条的读取数据然后一条一条发送给下游的 Group 聚合算子向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送相同的 key 发到同一个 SubTask并发 中。Group 聚合算子group by key sum / count / max / min接收到上游算子发的一条一条的数据去状态 state 中找这个 key 之前的 sum / count / max / min 结果。如果有结果 oldResult拿出来和当前的数据进行 sum / count / max / min 计算出这个 key 的新结果 newResult并将新结果 [key, newResult] 更新到 state 中在向下游发送新计算的结果之前先发一条撤回上次结果的消息 -[key, oldResult]然后再将新结果发往下游 [key, newResult]如果 state 中没有当前 key 的结果则直接使用当前这条数据计算 sum / max / min 结果 newResult并将新结果 [key, newResult] 更新到 state 中当前是第一次往下游发则不需要先发回撤消息直接发送 [key, newResult]。数据汇算子INSERT INTO target_table接收到上游发的一条一条的数据写入到 target_table Kafka 中这个实时任务也是 24 24 24 小时一直在运行的所有的算子在同一时刻都是处于 running 状态的。 1.4 Group 聚合支持 Grouping sets、Rollup、Cube Group 聚合也支持 Grouping sets、Rollup、Cube。举一个 Grouping sets 的案例 SELECT supplier_id, rating, product_id, COUNT(*) FROM (VALUES(supplier1, product1, 4),(supplier1, product2, 3),(supplier2, product3, 3),(supplier2, product4, 4)) AS Products(supplier_id, product_id, rating) GROUP BY GROUPING SET (( supplier_id, product_id, rating ),( supplier_id, product_id ),( supplier_id, rating ),( supplier_id ),( product_id, rating ),( product_id ),( rating ),( ) )2.Over 聚合 Over 聚合定义支持 Batch / Streaming可以理解为是一种特殊的滑动窗口聚合函数。 那这里我们拿 Over 聚合 与 窗口聚合 做一个对比其之间的最大不同之处在于 窗口聚合不在 group by 中的字段不能直接在 select 中拿到。Over 聚合能够保留原始字段。 注意其实在生产环境中Over 聚合的使用场景还是比较少的。在 Hive 中也有相同的聚合但是小伙伴萌可以想想你在离线数仓经常使用嘛 应用场景计算最近一段滑动窗口的聚合结果数据。实际案例查询每个产品最近一小时订单的金额总和。 SELECT order_id, order_time, amount,SUM(amount) OVER (PARTITION BY productORDER BY order_timeRANGE BETWEEN INTERVAL 1 HOUR PRECEDING AND CURRENT ROW) AS one_hour_prod_amount_sum FROM OrdersOver 聚合的语法总结如下 SELECTagg_func(agg_col) OVER ([PARTITION BY col1[, col2, ...]]ORDER BY time_colrange_definition),... FROM ...ORDER BY必须是时间戳列事件时间、处理时间。PARTITION BY标识了聚合窗口的聚合粒度如上述案例是按照 product 进行聚合。range_definition这个标识聚合窗口的聚合数据范围在 Flink 中有两种指定数据范围的方式。第一种为 按照行数聚合第二种为 按照时间区间聚合。如下案例所示。 2.1 时间区间聚合 按照时间区间聚合就是时间区间的一个滑动窗口比如下面案例 1 1 1 小时的区间最新输出的一条数据的 sum 聚合结果就是最近一小时数据的 amount 之和。 CREATE TABLE source_table (order_id BIGINT,product BIGINT,amount BIGINT,order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),WATERMARK FOR order_time AS order_time - INTERVAL 0.001 SECOND ) WITH (connector datagen,rows-per-second 1,fields.order_id.min 1,fields.order_id.max 2,fields.amount.min 1,fields.amount.max 10,fields.product.min 1,fields.product.max 2 );CREATE TABLE sink_table (product BIGINT,order_time TIMESTAMP(3),amount BIGINT,one_hour_prod_amount_sum BIGINT ) WITH (connector print );INSERT INTO sink_table SELECT product, order_time, amount,SUM(amount) OVER (PARTITION BY productORDER BY order_time-- 标识统计范围是一个 product 的最近 1 小时的数据RANGE BETWEEN INTERVAL 1 HOUR PRECEDING AND CURRENT ROW) AS one_hour_prod_amount_sum FROM source_table2.2 行数聚合 按照行数聚合就是数据行数的一个滑动窗口比如下面案例最新输出的一条数据的 sum 聚合结果就是最近 5 5 5 行数据的 amount 之和。 CREATE TABLE source_table (order_id BIGINT,product BIGINT,amount BIGINT,order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),WATERMARK FOR order_time AS order_time - INTERVAL 0.001 SECOND ) WITH (connector datagen,rows-per-second 1,fields.order_id.min 1,fields.order_id.max 2,fields.amount.min 1,fields.amount.max 2,fields.product.min 1,fields.product.max 2 );CREATE TABLE sink_table (product BIGINT,order_time TIMESTAMP(3),amount BIGINT,one_hour_prod_amount_sum BIGINT ) WITH (connector print );INSERT INTO sink_table SELECT product, order_time, amount,SUM(amount) OVER (PARTITION BY productORDER BY order_time-- 标识统计范围是一个 product 的最近 5 行数据ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) AS one_hour_prod_amount_sum FROM source_table预跑结果如下 I[2, 2021-12-24T22:18:19.147, 1, 9] I[1, 2021-12-24T22:18:20.147, 2, 11] I[1, 2021-12-24T22:18:21.147, 2, 12] I[1, 2021-12-24T22:18:22.147, 2, 12] I[1, 2021-12-24T22:18:23.148, 2, 12] I[1, 2021-12-24T22:18:24.147, 1, 11] I[1, 2021-12-24T22:18:25.146, 1, 10] I[1, 2021-12-24T22:18:26.147, 1, 9] I[2, 2021-12-24T22:18:27.145, 2, 11] I[2, 2021-12-24T22:18:28.148, 1, 10] I[2, 2021-12-24T22:18:29.145, 2, 10]当然如果你在一个 SELECT 中有多个聚合窗口的聚合方式Flink SQL 支持了一种简化写法如下案例 SELECT order_id, order_time, amount,SUM(amount) OVER w AS sum_amount,AVG(amount) OVER w AS avg_amount FROM Orders -- 使用下面子句定义 Over Window WINDOW w AS (PARTITION BY productORDER BY order_timeRANGE BETWEEN INTERVAL 1 HOUR PRECEDING AND CURRENT ROW)
http://www.zqtcl.cn/news/134668/

相关文章:

  • 山东省建设执业师网站建设网站图片
  • 网站建设的安全可行性网站建设教学设计
  • 网站架设建设动易门户网站价格
  • 公司快速建站商城网站建设讯息
  • it公司做网站用什么软件鲁中网站
  • 制作属于自己的app教程北京和隆优化招聘
  • wordpress会员卡系统青岛百度优化
  • 网站的管理系统网站权限配置
  • 龙岗高端网站建设在进行网站设计时
  • 网站制作定制浙江交工宏途交通建设有限公司网站
  • 域名网站计划怎么写高端网站建设 引擎技
  • 做自己的网站流量怎么桂林人论坛桂林板路
  • 上海制作网站多少钱wordpress主题站主题
  • 企业网站开发软件WordPress访问者ip
  • 视频网站dedecms在源码之家下载的网站模板可以作为自己的网站吗
  • 西宁好的网站建设公司怎样将视频代码上传至网站
  • 内网网站开发专业建站公司报价
  • 做地方网站需要什么部门批准天津专业做标书
  • 域名注册信息查询网站推广seo是什么
  • 做外贸网站哪家公司好常见的管理系统
  • 网站设计报价方案微信公众号外包
  • 网站设计遇到难题wordpress qq 微博
  • 网站模板种类长沙seo推广优化
  • 郑州网络建站公司wordpress安装及配置
  • 福州移动网站建设公司注册地址怎么写
  • 网站线上投票怎样做做铁艺需要什么网站
  • 襄阳营销型网站建设网站开发语言排行榜
  • 网站架构演变流程淄博亿泰
  • 电子商务网站功能介绍招商网站建设
  • 哈尔滨模板网站建站市场监督管理局12315