网站制作软件 aws,编程必背100个代码,WordPress 更改H标签,潍坊网站建设公司排名1. 简单订单统计
假设有以下两个订单流数据#xff0c;数据字段分别为用户ID、购买的商品名称、商品数量。
数据流A#xff1a; 1L,尺子,3 1L,铅笔,4 3L,橡皮,2 数据流B#xff1a; 2L,手表,3 2L,笔记本,3 4…1. 简单订单统计
假设有以下两个订单流数据数据字段分别为用户ID、购买的商品名称、商品数量。
数据流A 1L,尺子,3 1L,铅笔,4 3L,橡皮,2 数据流B 2L,手表,3 2L,笔记本,3 4L,计算器,1 目标合并两个流的数据并筛选出商品数量大于2的订单数据。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{$,EnvironmentSettings}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api_/**
* Flink SQL统计订单流数据
* 知识点DataStream转为Table、视图Table转为DataStream
*/
object FlinkSQLDemo{def main(args:Array[String]):Unit{//创建流执行环境val envStreamExecutionEnvironment.getExecutionEnvironment//创建EnvironmentSettings实例并设置参数val settingsEnvironmentSettings.newInstance() //创建一个用于创建EnvironmentSettings实例的构建器.useBlinkPlanner() //将Blink计划器设置为所需的模块默认.inStreamingMode() //设置组件以流模式工作默认启用.build() //创建一个不可变的EnvironmentSettings实例//构建流式表执行环境StreamTableEnvironmentval tableEnv: StreamTableEnvironmentStreamTableEnvironment.create(env,settings)//构建订单数据流Aval orderStreamA:DataStream[Order]env.fromCollection(List(Order(1L,尺子,3),Order(1L,铅笔,4),Order(3L,橡皮,2)))//构建订单数据流Bval orderStreamA:DataStream[Order]env.fromCollection(List(Order(2L,手表,3),Order(2L,笔记本,3),Order(4L,计算器,1)))//将DataStream转为Table,并指定Table的所有字段val tableA: TabletableEnv.fromDataStream(orderStreamA,$user,$product,$amount)//将Table的schema以摘要格式打印到控制台tableA.printSchema()//(// user BIGINT,// product STRING,// user INT,//)//将DataStream转为视图视图名称为tableB,并指定视图的所有字段tableEnv.createTemporaryView(tableB,orderStreamB,$(user),$(product),$(amount))//执行SQL查询合并查询结果println(tableA默认表名tableA.toString)val resultTable:TabletableEnv.sqlQuery(SELECT * FROM tableA WHERE amount2 UNION ALL SELECT * FROM tableB WHERE amount 2)//将结果Table转为仅追加流val dataStreamResulttableEnv.toAppendStream[Order](resultTable)//将流打印到控制台dataStreamResult.print()//触发程序执行env.execute()}
}//创建订单样例
case class Order(user:Long,product:String,amount:Int)
在IDEA本地执行上述代码控制台输出结果如下 1 Order(1,铅笔,4) 11 Order(2,笔记本,3) 10 Order(2,手表,3) 12 Order(1,尺子,3)