WordPress网站文章导出导入,惠州网站seo收费,wordpress 折叠展开,wordpress安装新主题Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置如何处理更新结果、时态表、流上的join、流上的确定性以及查询配置 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例1 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例2 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例3 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例4 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例6 17、Flink 之Table API: Table API 支持的操作1 20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL可以直接提交 SQL 任务到集群上
22、Flink 的table api与sql之创建表的DDL 24、Flink 的table api与sql之Catalogs介绍、类型、java api和sql实现ddl、java api和sql操作catalog-1 24、Flink 的table api与sql之Catalogsjava api操作数据库、表-2 24、Flink 的table api与sql之Catalogsjava api操作视图-3 24、Flink 的table api与sql之Catalogsjava api操作分区与函数-4
26、Flink 的SQL之概览与入门示例 27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例1 27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例2 27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例3 27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例4 27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例5 27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例6 27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例7 28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句 29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE1 29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE2 30、Flink SQL之SQL 客户端通过kafka和filesystem的例子介绍了配置文件使用-表、视图等 32、Flink table api和SQL 之用户自定义 Sources Sinks实现及详细示例 41、Flink之Hive 方言介绍及详细示例 42、Flink 的table api与sql之Hive Catalog 43、Flink之Hive 读写及详细验证示例 44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的 文章目录 Flink 系列文章一、Table API介绍1、入门示例1、maven依赖2、入门示例1-通过SQL和API创建表3、入门示例2-通过SQL和API创建视图4、入门示例-通过API查询表使用窗口函数 2、表的查询、过滤操作3、表的列操作4、表的聚合操作1、示例代码公共部分2、group by3、GroupBy Window Aggregation4、Over Window Aggregation5、Distinct Aggregation6、Distinct 5、表的join操作1、关于join的示例2、关于时态表的示例 本文通过示例介绍了如何使用table api进行表、视图、窗口函数的操作同时也介绍了table api对表的查询、过滤、列、聚合以及join操作。 关于表的set、order by、insert、group window、over window等相关操作详见下篇文章17、Flink 之Table API: Table API 支持的操作2。 本文依赖flink、kafka、hive集群能正常使用。 本文示例java api的实现是通过Flink 1.17版本做的示例SQL是在Flink 1.17版本的环境中运行的。 本文分为5个部分即入门示例、表的查询与过滤、表的列操作、表的聚合操作和表的join操作。
一、Table API介绍
Table API 是批处理和流处理的统一的关系型 API。Table API 的查询不需要修改代码就可以采用批输入或流输入来运行。Table API 是 SQL 语言的超集并且是针对 Apache Flink 专门设计的。Table API 集成了 ScalaJava 和 Python 语言的 API。Table API 的查询是使用 JavaScala 或 Python 语言嵌入的风格定义的有诸如自动补全和语法校验的 IDE 支持而不是像普通 SQL 一样使用字符串类型的值来指定查询。
Table API 和 Flink SQL 共享许多概念以及部分集成的 API。通过查看公共概念 API来学习如何注册表或如何创建一个表对象。流概念页面讨论了诸如动态表和时间属性等流特有的概念。
具体内容参照下文 15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置如何处理更新结果、时态表、流上的join、流上的确定性以及查询配置
1、入门示例
1、maven依赖
本文中所有示例若无特别说明均使用如下maven依赖。
propertiesencodingUTF-8/encodingproject.build.sourceEncodingUTF-8/project.build.sourceEncodingmaven.compiler.source1.8/maven.compiler.sourcemaven.compiler.target1.8/maven.compiler.targetjava.version1.8/java.versionscala.version2.12/scala.versionflink.version1.17.0/flink.version/propertiesdependencies!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-common/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-sql-gateway/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-csv/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-json/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner_2.12/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-uber/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-runtime/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactIdversion3.1.0-1.17/version/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.38/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-hive_2.12/artifactIdversion1.17.0/version/dependencydependencygroupIdorg.apache.hive/groupIdartifactIdhive-exec/artifactIdversion3.1.2/version/dependency!-- flink连接器 --!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-sql-connector-kafka/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress --dependencygroupIdorg.apache.commons/groupIdartifactIdcommons-compress/artifactIdversion1.24.0/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.2/version/dependency/dependencies2、入门示例1-通过SQL和API创建表
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.and;
import static org.apache.flink.table.api.Expressions.lit;
import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;import java.sql.Timestamp;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogView;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.types.Row;import com.google.common.collect.Lists;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** author alanchan**/
public class TestTableAPIDemo {/*** param args* throws Exception*/public static void main(String[] args) throws Exception {testCreateTableBySQLAndAPI();}static void testCreateTableBySQLAndAPI() throws Exception {
// EnvironmentSettings env EnvironmentSettings.newInstance().inStreamingMode().build();
// TableEnvironment tenv TableEnvironment.create(env);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);// SQL 创建输入表
// String sourceSql CREATE TABLE Alan_KafkaTable (\r\n
// event_time TIMESTAMP(3) METADATA FROM timestamp,\r\n
// partition BIGINT METADATA VIRTUAL,\r\n
// offset BIGINT METADATA VIRTUAL,\r\n
// user_id BIGINT,\r\n
// item_id BIGINT,\r\n
// behavior STRING\r\n
// ) WITH (\r\n
// connector kafka,\r\n
// topic user_behavior,\r\n
// properties.bootstrap.servers 192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092,\r\n
// properties.group.id testGroup,\r\n
// scan.startup.mode earliest-offset,\r\n
// format csv\r\n
// );;
// tenv.executeSql(sourceSql);//API创建表Schema schema Schema.newBuilder().columnByMetadata(event_time, DataTypes.TIME(3), timestamp).columnByMetadata(partition, DataTypes.BIGINT(), true).columnByMetadata(offset, DataTypes.BIGINT(), true).column(user_id, DataTypes.BIGINT()).column(item_id, DataTypes.BIGINT()).column(behavior, DataTypes.STRING()).build();TableDescriptor kafkaDescriptor TableDescriptor.forConnector(kafka).comment(kafka source table).schema(schema).option(KafkaConnectorOptions.TOPIC, Lists.newArrayList(user_behavior)).option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, 192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092).option(KafkaConnectorOptions.PROPS_GROUP_ID, testGroup).option(scan.startup.mode, earliest-offset).format(csv).build();tenv.createTemporaryTable(Alan_KafkaTable, kafkaDescriptor);//查询String sql select * from Alan_KafkaTable ;Table resultQuery tenv.sqlQuery(sql);DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(resultQuery, Row.class);// 6、sinkresultDS.print();// 7、执行env.execute();//kafka中输入测试数据
// 1,1001,login
// 1,2001,p_read//程序运行控制台输入如下
// 11 (true,I[16:32:19.923, 0, 0, 1, 1001, login])
// 11 (true,I[16:32:32.258, 0, 1, 1, 2001, p_read])}DataNoArgsConstructorAllArgsConstructorpublic static class User {private long id;private String name;private int age;private Long rowtime;}}上面例子是通过SQL和API两种方式创建一张名称为Alan_KafkaTable 的连接器为kafka的表然后查询其数据。如需要需要进行聚合操作直接编写sql即可。
3、入门示例2-通过SQL和API创建视图
程序的整体框架使用入门示例1的此处仅仅列出创建视图的方法
static void testCreateViewByAPI() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);// SQL 创建输入表String sourceSql CREATE TABLE Alan_KafkaTable (\r\n event_time TIMESTAMP(3) METADATA FROM timestamp,\r\n partition BIGINT METADATA VIRTUAL,\r\n offset BIGINT METADATA VIRTUAL,\r\n user_id BIGINT,\r\n item_id BIGINT,\r\n behavior STRING\r\n ) WITH (\r\n connector kafka,\r\n topic user_behavior,\r\n properties.bootstrap.servers 192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092,\r\n properties.group.id testGroup,\r\n scan.startup.mode earliest-offset,\r\n format csv\r\n );;tenv.executeSql(sourceSql);// 创建视图String catalogName alan_hive;String defaultDatabase default;String databaseName viewtest_db;String hiveConfDir /usr/local/bigdata/apache-hive-3.1.2-bin/conf;HiveCatalog hiveCatalog new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);tenv.registerCatalog(catalogName, hiveCatalog);tenv.useCatalog(catalogName);hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {}, true);tenv.useDatabase(databaseName);String viewName Alan_KafkaView;String originalQuery select user_id , behavior from Alan_KafkaTable group by user_id ,behavior ;String expandedQuery SELECT user_id , behavior FROM databaseName.Alan_KafkaTable group by user_id ,behavior ; String comment this is a comment;ObjectPath path new ObjectPath(databaseName, viewName);createView(originalQuery,expandedQuery,comment,hiveCatalog,path);// 查询视图String queryViewSQL select * from Alan_KafkaView ;Table queryViewResult tenv.sqlQuery(queryViewSQL);DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(queryViewResult, Row.class);// 6、sinkresultDS.print();// 7、执行env.execute();//kafka中输入测试数据// 1,1001,login// 1,2001,p_read//程序运行控制台输入如下// 3 (true,I[1, login])// 14 (true,I[1, p_read])}static void createView(String originalQuery,String expandedQuery,String comment,HiveCatalog hiveCatalog,ObjectPath path) throws Exception {ResolvedSchema resolvedSchema new ResolvedSchema(Arrays.asList(Column.physical(user_id, DataTypes.INT()),Column.physical(behavior, DataTypes.STRING())),Collections.emptyList(),null);CatalogView origin CatalogView.of(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),comment,originalQuery,expandedQuery,Collections.emptyMap());CatalogView view new ResolvedCatalogView(origin, resolvedSchema);hiveCatalog.createTable(path, view, false);}static void testCreateViewBySQL() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);// SQL 创建输入表String sourceSql CREATE TABLE Alan_KafkaTable (\r\n event_time TIMESTAMP(3) METADATA FROM timestamp,\r\n partition BIGINT METADATA VIRTUAL,\r\n offset BIGINT METADATA VIRTUAL,\r\n user_id BIGINT,\r\n item_id BIGINT,\r\n behavior STRING\r\n ) WITH (\r\n connector kafka,\r\n topic user_behavior,\r\n properties.bootstrap.servers 192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092,\r\n properties.group.id testGroup,\r\n scan.startup.mode earliest-offset,\r\n format csv\r\n );;tenv.executeSql(sourceSql);//String sql select user_id , behavior from Alan_KafkaTable group by user_id ,behavior ;Table resultQuery tenv.sqlQuery(sql);tenv.createTemporaryView(Alan_KafkaView, resultQuery);String queryViewSQL select * from Alan_KafkaView ;Table queryViewResult tenv.sqlQuery(queryViewSQL);DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(queryViewResult, Row.class);// 6、sinkresultDS.print();// 7、执行env.execute();//kafka中输入测试数据// 1,1001,login// 1,2001,p_read//程序运行控制台输入如下// 3 (true,I[1, login])// 14 (true,I[1, p_read])}本示例通过sql和api创建视图视图是user_id和behavior分组的结果如果需要聚合直接使用sql即可。
4、入门示例-通过API查询表使用窗口函数
本示例实现了Tumble和Over窗口。 如果使用sql的窗口函数参考 27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例5
static void testQueryTableWithWindwosByAPI() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);DataStreamUser users env.fromCollection(userList).assignTimestampsAndWatermarks(WatermarkStrategy.UserforBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((user, recordTimestamp) - user.getRowtime()));Table usersTable tenv.fromDataStream(users, $(id), $(name), $(age),$(rt).rowtime());// tumbleTable result usersTable.filter(and(
// $(name).equals(alanchan),
// $(age).between(1, 20),$(name).isNotNull(),$(age).isGreaterOrEqual(19))).window(Tumble.over(lit(1).hours()).on($(rt)).as(hourlyWindow))// 定义滚动窗口并给窗口起一个别名.groupBy($(name),$(hourlyWindow))// 窗口必须出现的分组字段中.select($(name),$(name).count().as(count(*)), $(hourlyWindow).start(), $(hourlyWindow).end());result.printSchema();DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(result, Row.class);resultDS.print();// over usersTable.window(Over.partitionBy($(name)).orderBy($(rt)).preceding(unresolvedCall(BuiltInFunctionDefinitions.UNBOUNDED_RANGE)).as(hourlyWindow)).select($(id), $(rt), $(id).count().over($(hourlyWindow)).as(count_t)).execute().print();env.execute();}Table API 支持 Scala, Java 和 Python 语言。Scala 语言的 Table API 利用了 Scala 表达式Java 语言的 Table API 支持 DSL 表达式和解析并转换为等价表达式的字符串Python 语言的 Table API 仅支持解析并转换为等价表达式的字符串。
整体来看使用API操作Flink 的表可能会比较麻烦大多数还是使用sql比较简单如果sql不满足的情况下api是一个补充。
2、表的查询、过滤操作
Table API支持如下操作。请注意不是所有的操作都可以既支持流也支持批这些操作都具有相应的标记。 具体示例如下运行结果在源文件中 import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.row;
import static org.apache.flink.table.api.Expressions.and;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;/*** author alanchan**/
public class TestTableAPIOperationDemo {static String sourceSql CREATE TABLE Alan_KafkaTable (\r\n event_time TIMESTAMP(3) METADATA FROM timestamp,\r\n partition BIGINT METADATA VIRTUAL,\r\n offset BIGINT METADATA VIRTUAL,\r\n user_id BIGINT,\r\n item_id BIGINT,\r\n behavior STRING\r\n ) WITH (\r\n connector kafka,\r\n topic user_behavior,\r\n properties.bootstrap.servers 192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092,\r\n properties.group.id testGroup,\r\n scan.startup.mode earliest-offset,\r\n format csv\r\n );;/*** param args* throws Exception*/public static void main(String[] args) throws Exception {
// test1();
// test2();test3();}static void test3() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);// 建表tenv.executeSql(sourceSql);Table table1 tenv.from(Alan_KafkaTable);// 重命名字段。Table result table1.as(a,b,c,d,e,f);DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(result, Row.class);resultDS.print();//11 (true,I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])//和 SQL 的 WHERE 子句类似。 过滤掉未验证通过过滤谓词的行。Table table2 result.where($(f).isEqual(login));DataStreamTuple2Boolean, Row result2DS tenv.toRetractStream(table2, Row.class);result2DS.print();//11 (true,I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])Table table3 result.where($(f).isNotEqual(login));DataStreamTuple2Boolean, Row result3DS tenv.toRetractStream(table3, Row.class);result3DS.print();// 没有匹配条件的记录无输出Table table4 result.filter(and($(f).isNotNull(),
// $(d).isGreater(1)$(e).isNotNull()));DataStreamTuple2Boolean, Row result4DS tenv.toRetractStream(table4, Row.class);result4DS.print(test filter:);//test filter::11 (true,I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])env.execute();}/*** 和 SQL 查询中的 VALUES 子句类似。 基于提供的行生成一张内联表。* * 你可以使用 row(...) 表达式创建复合行* * throws Exception*/static void test2() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);Table table tenv.fromValues(row(1, ABC), row(2L, ABCDE));table.printSchema();
// (
// f0 BIGINT NOT NULL,
// f1 VARCHAR(5) NOT NULL
// )DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(table, Row.class);resultDS.print();
// 1 (true,I[2, ABCDE])
// 2 (true,I[1, ABC])Table table2 tenv.fromValues(DataTypes.ROW(DataTypes.FIELD(id, DataTypes.DECIMAL(10, 2)),DataTypes.FIELD(name, DataTypes.STRING())),row(1, ABCD),row(2L, ABCDEF));table2.printSchema();
// (
// id DECIMAL(10, 2),
// name STRING
// )DataStreamTuple2Boolean, Row result2DS tenv.toRetractStream(table2, Row.class);result2DS.print();
// 15 (true,I[2.00, ABCDEF])
// 14 (true,I[1.00, ABCD])env.execute();}/*** 和 SQL 查询的 FROM 子句类似。 执行一个注册过的表的扫描。* * throws Exception*/static void test1() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);// 建表tenv.executeSql(sourceSql);// 查询
// tenv.from(Alan_KafkaTable).execute().print();// kafka输入数据// 1,1002,login// 应用程序控制台输出如下
// -----------------------------------------------------------------------------------------------------------------------------------------------------
// | op | event_time | partition | offset | user_id | item_id | behavior |
// -----------------------------------------------------------------------------------------------------------------------------------------------------
// | I | 2023-11-01 11:00:30.183 | 0 | 2 | 1 | 1002 | login |Table temp tenv.from(Alan_KafkaTable);//和 SQL 的 SELECT 子句类似。 执行一个 select 操作Table result1 temp.select($(user_id), $(item_id).as(behavior), $(event_time));DataStreamTuple2Boolean, Row result1DS tenv.toRetractStream(result1, Row.class);
// result1DS.print();
// 11 (true,I[1, 1002, 2023-11-01T11:00:30.183])//选择星号*作为通配符select 表中的所有列。Table result2 temp.select($(*));DataStreamTuple2Boolean, Row result2DS tenv.toRetractStream(result2, Row.class);result2DS.print();
// 11 (true,I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])env.execute();}}
3、表的列操作
具体示例如下运行结果在源文件中
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.row;
import static org.apache.flink.table.api.Expressions.and;
import static org.apache.flink.table.api.Expressions.concat;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;/*** author alanchan**/
public class TestTableAPIOperationDemo {static String sourceSql CREATE TABLE Alan_KafkaTable (\r\n event_time TIMESTAMP(3) METADATA FROM timestamp,\r\n partition BIGINT METADATA VIRTUAL,\r\n offset BIGINT METADATA VIRTUAL,\r\n user_id BIGINT,\r\n item_id BIGINT,\r\n behavior STRING\r\n ) WITH (\r\n connector kafka,\r\n topic user_behavior,\r\n properties.bootstrap.servers 192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092,\r\n properties.group.id testGroup,\r\n scan.startup.mode earliest-offset,\r\n format csv\r\n );;/*** param args* throws Exception*/public static void main(String[] args) throws Exception {
// test1();
// test2();test3();}static void test3() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);// 建表tenv.executeSql(sourceSql);Table table1 tenv.from(Alan_KafkaTable);// 重命名字段。Table result table1.as(a,b,c,d,e,f);DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(result, Row.class);resultDS.print();//11 (true,I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])//和 SQL 的 WHERE 子句类似。 过滤掉未验证通过过滤谓词的行。Table table2 result.where($(f).isEqual(login));DataStreamTuple2Boolean, Row result2DS tenv.toRetractStream(table2, Row.class);result2DS.print();//11 (true,I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])Table table3 result.where($(f).isNotEqual(login));DataStreamTuple2Boolean, Row result3DS tenv.toRetractStream(table3, Row.class);result3DS.print();// 没有匹配条件的记录无输出Table table4 result.filter(and($(f).isNotNull(),
// $(d).isGreater(1)$(e).isNotNull()));DataStreamTuple2Boolean, Row result4DS tenv.toRetractStream(table4, Row.class);result4DS.print(test filter:);//test filter::11 (true,I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])env.execute();}/*** 和 SQL 查询中的 VALUES 子句类似。 基于提供的行生成一张内联表。* * 你可以使用 row(...) 表达式创建复合行* * throws Exception*/static void test2() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);Table table tenv.fromValues(row(1, ABC), row(2L, ABCDE));table.printSchema();
// (
// f0 BIGINT NOT NULL,
// f1 VARCHAR(5) NOT NULL
// )DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(table, Row.class);resultDS.print();
// 1 (true,I[2, ABCDE])
// 2 (true,I[1, ABC])Table table2 tenv.fromValues(DataTypes.ROW(DataTypes.FIELD(id, DataTypes.DECIMAL(10, 2)),DataTypes.FIELD(name, DataTypes.STRING())),row(1, ABCD),row(2L, ABCDEF));table2.printSchema();
// (
// id DECIMAL(10, 2),
// name STRING
// )DataStreamTuple2Boolean, Row result2DS tenv.toRetractStream(table2, Row.class);result2DS.print();
// 15 (true,I[2.00, ABCDEF])
// 14 (true,I[1.00, ABCD])env.execute();}/*** 和 SQL 查询的 FROM 子句类似。 执行一个注册过的表的扫描。* * throws Exception*/static void test1() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);// 建表tenv.executeSql(sourceSql);// 查询
// tenv.from(Alan_KafkaTable).execute().print();// kafka输入数据// 1,1002,login// 应用程序控制台输出如下
// -----------------------------------------------------------------------------------------------------------------------------------------------------
// | op | event_time | partition | offset | user_id | item_id | behavior |
// -----------------------------------------------------------------------------------------------------------------------------------------------------
// | I | 2023-11-01 11:00:30.183 | 0 | 2 | 1 | 1002 | login |Table temp tenv.from(Alan_KafkaTable);//和 SQL 的 SELECT 子句类似。 执行一个 select 操作Table result1 temp.select($(user_id), $(item_id).as(behavior), $(event_time));DataStreamTuple2Boolean, Row result1DS tenv.toRetractStream(result1, Row.class);
// result1DS.print();
// 11 (true,I[1, 1002, 2023-11-01T11:00:30.183])//选择星号*作为通配符select 表中的所有列。Table result2 temp.select($(*));DataStreamTuple2Boolean, Row result2DS tenv.toRetractStream(result2, Row.class);result2DS.print();
// 11 (true,I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])env.execute();}static void test5() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);// 建表tenv.executeSql(sourceSql);Table table tenv.from(Alan_KafkaTable);//和 SQL 的 GROUP BY 子句类似。 使用分组键对行进行分组使用伴随的聚合算子来按照组进行聚合行。Table result table.groupBy($(user_id)).select($(user_id), $(user_id).count().as(count(user_id)));DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(result, Row.class);resultDS.print();
// 12 (true,I[1, 1])env.execute();}static void test4() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);// 建表tenv.executeSql(sourceSql);Table table tenv.from(Alan_KafkaTable);//执行字段添加操作。 如果所添加的字段已经存在将抛出异常。Table result2 table.addColumns($(behavior).plus(1).as(t_col1));result2.printSchema();
// (
// event_time TIMESTAMP(3),
// partition BIGINT,
// offset BIGINT,
// user_id BIGINT,
// item_id BIGINT,
// behavior STRING,
// t_col1 STRING
// )Table result table.addColumns($(behavior).plus(1).as(t_col3), concat($(behavior), alanchan).as(t_col4));result.printSchema();
// (
// event_time TIMESTAMP(3),
// partition BIGINT,
// offset BIGINT,
// user_id BIGINT,
// item_id BIGINT,
// behavior STRING,
// t_col3 STRING,
// t_col4 STRING
// )Table result3 table.addColumns(concat($(behavior), alanchan).as(t_col4));result3.printSchema();
// (
// event_time TIMESTAMP(3),
// partition BIGINT,
// offset BIGINT,
// user_id BIGINT,
// item_id BIGINT,
// behavior STRING,
// t_col4 STRING
// )//执行字段添加操作。 如果添加的列名称和已存在的列名称相同则已存在的字段将被替换。 此外如果添加的字段里面有重复的字段名则会使用最后一个字段。Table result4 result3.addOrReplaceColumns(concat($(t_col4), alanchan).as(t_col));result4.printSchema();
// (
// event_time TIMESTAMP(3),
// partition BIGINT,
// offset BIGINT,
// user_id BIGINT,
// item_id BIGINT,
// behavior STRING,
// t_col4 STRING,
// t_col STRING
// )Table result5 result4.dropColumns($(t_col4), $(t_col));result5.printSchema();
// (
// event_time TIMESTAMP(3),
// partition BIGINT,
// offset BIGINT,
// user_id BIGINT,
// item_id BIGINT,
// behavior STRING
// )//执行字段重命名操作。 字段表达式应该是别名表达式并且仅当字段已存在时才能被重命名。Table result6 result4.renameColumns($(t_col4).as(col1), $(t_col).as(col2));result6.printSchema();
// (
// event_time TIMESTAMP(3),
// partition BIGINT,
// offset BIGINT,
// user_id BIGINT,
// item_id BIGINT,
// behavior STRING,
// col1 STRING,
// col2 STRING
// )DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(table, Row.class);resultDS.print();
// 11 (true,I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])env.execute();}
}
4、表的聚合操作
1、示例代码公共部分
本部分仅仅就是用的公共对象比如User的定义和需要引入的包 import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;
import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;import java.time.Duration;
import java.util.Arrays;
import java.util.List;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.types.Row;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** author alanchan**/
public class TestTableAPIOperationDemo2 {final static ListUser userList Arrays.asList(new User(1L, alan, 18, 1698742358391L), new User(2L, alan, 19, 1698742359396L), new User(3L, alan, 25, 1698742360407L),new User(4L, alanchan, 28, 1698742361409L), new User(5L, alanchan, 29, 1698742362424L));/*** param args* throws Exception*/public static void main(String[] args) throws Exception {
// test1();
// test2();
// test3();test4();}DataNoArgsConstructorAllArgsConstructorpublic static class User {private long id;private String name;private int balance;private Long rowtime;}}
2、group by
本示例仅仅展示了group by操作比较简单。 static void test2() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);// 建表tenv.executeSql(sourceSql);Table table tenv.from(Alan_KafkaTable);//和 SQL 的 GROUP BY 子句类似。 使用分组键对行进行分组使用伴随的聚合算子来按照组进行聚合行。Table result table.groupBy($(user_id)).select($(user_id), $(user_id).count().as(count(user_id)));DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(result, Row.class);resultDS.print();
// 12 (true,I[1, 1])env.execute();}3、GroupBy Window Aggregation
使用分组窗口结合单个或者多个分组键对表进行分组和聚合。
static void test2() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);DataStreamUser users env.fromCollection(userList).assignTimestampsAndWatermarks(WatermarkStrategy.UserforBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((user, recordTimestamp) - user.getRowtime()));Table usersTable tenv.fromDataStream(users, $(id), $(name), $(balance),$(rowtime).rowtime());//使用分组窗口结合单个或者多个分组键对表进行分组和聚合。Table result usersTable.window(Tumble.over(lit(5).minutes()).on($(rowtime)).as(w)) // 定义窗口.groupBy($(name), $(w)) // 按窗口和键分组// 访问窗口属性并聚合.select($(name),$(w).start(),$(w).end(),$(w).rowtime(),$(balance).sum().as(sum(balance)));DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(result, Row.class);resultDS.print();
// 2 (true,I[alan, 2023-10-31T08:50, 2023-10-31T08:55, 2023-10-31T08:54:59.999, 62])
// 16 (true,I[alanchan, 2023-10-31T08:50, 2023-10-31T08:55, 2023-10-31T08:54:59.999, 57])env.execute();}4、Over Window Aggregation
和 SQL 的 OVER 子句类似。
static void test3() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);DataStreamUser users env.fromCollection(userList).assignTimestampsAndWatermarks(WatermarkStrategy.UserforBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((user, recordTimestamp) - user.getRowtime()));Table usersTable tenv.fromDataStream(users, $(id), $(name), $(balance),$(rowtime).rowtime());// 所有的聚合必须定义在同一个窗口上比如同一个分区、排序和范围内。目前只支持 PRECEDING 到当前行范围无界或有界的窗口。//尚不支持 FOLLOWING 范围的窗口。ORDER BY 操作必须指定一个单一的时间属性。Table result usersTable// 定义窗口.window(Over.partitionBy($(name)).orderBy($(rowtime)).preceding(unresolvedCall(BuiltInFunctionDefinitions.UNBOUNDED_RANGE)).following(unresolvedCall(BuiltInFunctionDefinitions.CURRENT_RANGE)).as(w))// 滑动聚合.select($(id),$(balance).avg().over($(w)),$(balance).max().over($(w)),$(balance).min().over($(w)));DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(result, Row.class);resultDS.print();
// 2 (true,I[1, 18, 18, 18])
// 16 (true,I[4, 28, 28, 28])
// 2 (true,I[2, 18, 19, 18])
// 16 (true,I[5, 28, 29, 28])
// 2 (true,I[3, 20, 25, 18])env.execute();}5、Distinct Aggregation
/*** 和 SQL DISTINCT 聚合子句类似例如 COUNT(DISTINCT a)。 * Distinct 聚合声明的聚合函数内置或用户定义的仅应用于互不相同的输入值。 * Distinct 可以应用于 GroupBy Aggregation、GroupBy Window Aggregation 和 Over Window Aggregation。* throws Exception*/static void test4() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);DataStreamUser users env.fromCollection(userList).assignTimestampsAndWatermarks(WatermarkStrategy.UserforBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((user, recordTimestamp) - user.getRowtime()));Table usersTable tenv.fromDataStream(users, $(id), $(name), $(balance),$(rowtime).rowtime());// 按属性分组后的的互异互不相同、去重聚合Table groupByDistinctResult usersTable.groupBy($(name)).select($(name), $(balance).sum().distinct().as(sum_balance));DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(groupByDistinctResult, Row.class);
// resultDS.print();
// 2 (true,I[alan, 18])
// 16 (true,I[alanchan, 28])
// 16 (false,-U[alanchan, 28])
// 2 (false,-U[alan, 18])
// 16 (true,U[alanchan, 57])
// 2 (true,U[alan, 37])
// 2 (false,-U[alan, 37])
// 2 (true,U[alan, 62])//按属性、时间窗口分组后的互异互不相同、去重聚合Table groupByWindowDistinctResult usersTable.window(Tumble.over(lit(5).minutes()).on($(rowtime)).as(w)).groupBy($(name), $(w)).select($(name), $(balance).sum().distinct().as(sum_balance));DataStreamTuple2Boolean, Row result2DS tenv.toRetractStream(groupByDistinctResult, Row.class);
// result2DS.print();
// 16 (true,I[alanchan, 28])
// 2 (true,I[alan, 18])
// 16 (false,-U[alanchan, 28])
// 2 (false,-U[alan, 18])
// 16 (true,U[alanchan, 57])
// 2 (true,U[alan, 37])
// 2 (false,-U[alan, 37])
// 2 (true,U[alan, 62])//over window 上的互异互不相同、去重聚合Table result usersTable.window(Over.partitionBy($(name)).orderBy($(rowtime)).preceding(unresolvedCall(BuiltInFunctionDefinitions.UNBOUNDED_RANGE)).as(w)).select($(name), $(balance).avg().distinct().over($(w)),$(balance).max().over($(w)),$(balance).min().over($(w)));DataStreamTuple2Boolean, Row result3DS tenv.toRetractStream(result, Row.class);result3DS.print();
// 16 (true,I[alanchan, 28, 28, 28])
// 2 (true,I[alan, 18, 18, 18])
// 2 (true,I[alan, 18, 19, 18])
// 16 (true,I[alanchan, 28, 29, 28])
// 2 (true,I[alan, 20, 25, 18])env.execute();}用户定义的聚合函数也可以与 DISTINCT 修饰符一起使用。如果计算不同互异、去重的值的聚合结果则只需向聚合函数添加 distinct 修饰符即可。
Table orders tEnv.from(Orders);// 对 user-defined aggregate functions 使用互异互不相同、去重聚合
tEnv.registerFunction(myUdagg, new MyUdagg());
orders.groupBy(users).select($(users),call(myUdagg, $(points)).distinct().as(myDistinctResult));6、Distinct
和 SQL 的 DISTINCT 子句类似。 返回具有不同组合值的记录。 static void test5() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);ListUser userList Arrays.asList(new User(1L, alan, 18, 1698742358391L), new User(2L, alan, 19, 1698742359396L), new User(3L, alan, 25, 1698742360407L),new User(4L, alanchan, 28, 1698742361409L), new User(5L, alanchan, 29, 1698742362424L),new User(5L, alanchan, 29, 1698742362424L));DataStreamUser users env.fromCollection(userList).assignTimestampsAndWatermarks(WatermarkStrategy.UserforBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((user, recordTimestamp) - user.getRowtime()));Table usersTable tenv.fromDataStream(users, $(id), $(name), $(balance),$(rowtime).rowtime());
// Table orders tableEnv.from(Orders);Table result usersTable.distinct();DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(result, Row.class);resultDS.print();// 数据集有6条记录并且有一条是重复的故只输出5条
// 9 (true,I[2, alan, 19, 2023-10-31T08:52:39.396])
// 1 (true,I[1, alan, 18, 2023-10-31T08:52:38.391])
// 13 (true,I[3, alan, 25, 2023-10-31T08:52:40.407])
// 7 (true,I[4, alanchan, 28, 2023-10-31T08:52:41.409])
// 13 (true,I[5, alanchan, 29, 2023-10-31T08:52:42.424])env.execute();}5、表的join操作
本部分介绍了表的join主要操作比如内联接、外联接以及联接自定义函数等其中时态表的联接以scala的示例进行说明。 关于自定义函数的联接将在flink 自定义函数中介绍因为使用函数和联接本身关系不是非常密切。 19、Flink 的Table API 和 SQL 中的自定义函数2
1、关于join的示例
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.and;
import static org.apache.flink.table.api.Expressions.call;import java.util.Arrays;
import java.util.List;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** author alanchan**/
public class TestTableAPIJoinOperationDemo {DataNoArgsConstructorAllArgsConstructorpublic static class User {private long id;private String name;private double balance;private Long rowtime;}DataNoArgsConstructorAllArgsConstructorpublic static class Order {private long id;private long user_id;private double amount;private Long rowtime;}final static ListUser userList Arrays.asList(new User(1L, alan, 18, 1698742358391L), new User(2L, alan, 19, 1698742359396L), new User(3L, alan, 25, 1698742360407L),new User(4L, alanchan, 28, 1698742361409L), new User(5L, alanchan, 29, 1698742362424L));final static ListOrder orderList Arrays.asList(new Order(1L, 1, 18, 1698742358391L), new Order(2L, 2, 19, 1698742359396L), new Order(3L, 1, 25, 1698742360407L),new Order(4L, 3, 28, 1698742361409L), new Order(5L, 1, 29, 1698742362424L),new Order(6L, 4, 49, 1698742362424L));static void testInnerJoin() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);DataStreamUser users env.fromCollection(userList);Table usersTable tenv.fromDataStream(users, $(id), $(name),$(balance),$(rowtime));DataStreamOrder orders env.fromCollection(orderList);Table ordersTable tenv.fromDataStream(orders, $(id), $(user_id), $(amount),$(rowtime));Table left usersTable.select($(id).as(userId), $(name), $(balance),$(rowtime).as(u_rowtime));Table right ordersTable.select($(id).as(orderId), $(user_id), $(amount),$(rowtime).as(o_rowtime));Table result left.join(right).where($(user_id).isEqual($(userId))).select($(orderId), $(user_id), $(amount),$(o_rowtime),$(name),$(balance));DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(result, Row.class);resultDS.print();
// 15 (true,I[4, 3, 28.0, 1698742361409, alan, 25])
// 12 (true,I[1, 1, 18.0, 1698742358391, alan, 18])
// 3 (true,I[6, 4, 49.0, 1698742362424, alanchan, 28])
// 12 (true,I[2, 2, 19.0, 1698742359396, alan, 19])
// 12 (true,I[3, 1, 25.0, 1698742360407, alan, 18])
// 12 (true,I[5, 1, 29.0, 1698742362424, alan, 18])env.execute();}/*** 和 SQL LEFT/RIGHT/FULL OUTER JOIN 子句类似。 关联两张表。 两张表必须有不同的字段名并且必须定义至少一个等式连接谓词。* throws Exception*/static void testOuterJoin() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);DataStreamUser users env.fromCollection(userList);Table usersTable tenv.fromDataStream(users, $(id), $(name),$(balance),$(rowtime));DataStreamOrder orders env.fromCollection(orderList);Table ordersTable tenv.fromDataStream(orders, $(id), $(user_id), $(amount),$(rowtime));Table left usersTable.select($(id).as(userId), $(name), $(balance),$(rowtime).as(u_rowtime));Table right ordersTable.select($(id).as(orderId), $(user_id), $(amount),$(rowtime).as(o_rowtime));Table leftOuterResult left.leftOuterJoin(right, $(user_id).isEqual($(userId))).select($(orderId), $(user_id), $(amount),$(o_rowtime),$(name),$(balance));DataStreamTuple2Boolean, Row leftOuterResultDS tenv.toRetractStream(leftOuterResult, Row.class);
// leftOuterResultDS.print();
// 12 (true,I[null, null, null, null, alan, 18])
// 3 (true,I[null, null, null, null, alanchan, 28])
// 12 (false,-D[null, null, null, null, alan, 18])
// 12 (true,I[1, 1, 18.0, 1698742358391, alan, 18])
// 15 (true,I[4, 3, 28.0, 1698742361409, alan, 25])
// 12 (true,I[null, null, null, null, alan, 19])
// 3 (false,-D[null, null, null, null, alanchan, 28])
// 12 (false,-D[null, null, null, null, alan, 19])
// 3 (true,I[6, 4, 49.0, 1698742362424, alanchan, 28])
// 12 (true,I[2, 2, 19.0, 1698742359396, alan, 19])
// 12 (true,I[3, 1, 25.0, 1698742360407, alan, 18])
// 3 (true,I[null, null, null, null, alanchan, 29])
// 12 (true,I[5, 1, 29.0, 1698742362424, alan, 18])Table rightOuterResult left.rightOuterJoin(right, $(user_id).isEqual($(userId))).select($(orderId), $(user_id), $(amount),$(o_rowtime),$(name),$(balance));DataStreamTuple2Boolean, Row rightOuterResultDS tenv.toRetractStream(rightOuterResult, Row.class);
// rightOuterResultDS.print();
// 12 (true,I[1, 1, 18.0, 1698742358391, alan, 18])
// 3 (true,I[6, 4, 49.0, 1698742362424, alanchan, 28])
// 15 (true,I[4, 3, 28.0, 1698742361409, alan, 25])
// 12 (true,I[2, 2, 19.0, 1698742359396, alan, 19])
// 12 (true,I[3, 1, 25.0, 1698742360407, alan, 18])
// 12 (true,I[5, 1, 29.0, 1698742362424, alan, 18])Table fullOuterResult left.fullOuterJoin(right, $(user_id).isEqual($(userId))).select($(orderId), $(user_id), $(amount),$(o_rowtime),$(name),$(balance));DataStreamTuple2Boolean, Row fullOuterResultDS tenv.toRetractStream(fullOuterResult, Row.class);fullOuterResultDS.print();
// 3 (true,I[6, 4, 49.0, 1698742362424, null, null])
// 12 (true,I[1, 1, 18.0, 1698742358391, null, null])
// 15 (true,I[4, 3, 28.0, 1698742361409, null, null])
// 12 (false,-D[1, 1, 18.0, 1698742358391, null, null])
// 3 (false,-D[6, 4, 49.0, 1698742362424, null, null])
// 12 (true,I[1, 1, 18.0, 1698742358391, alan, 18])
// 3 (true,I[6, 4, 49.0, 1698742362424, alanchan, 28])
// 3 (true,I[null, null, null, null, alanchan, 29])
// 12 (true,I[2, 2, 19.0, 1698742359396, null, null])
// 12 (false,-D[2, 2, 19.0, 1698742359396, null, null])
// 12 (true,I[2, 2, 19.0, 1698742359396, alan, 19])
// 15 (false,-D[4, 3, 28.0, 1698742361409, null, null])
// 12 (true,I[3, 1, 25.0, 1698742360407, alan, 18])
// 15 (true,I[4, 3, 28.0, 1698742361409, alan, 25])
// 12 (true,I[5, 1, 29.0, 1698742362424, alan, 18])env.execute();}/*** Interval join 是可以通过流模式处理的常规 join 的子集。* Interval join 至少需要一个 equi-join 谓词和一个限制双方时间界限的 join 条件。* 这种条件可以由两个合适的范围谓词、、、或一个比较两个输入表相同时间属性即处理时间或事件时间的等值谓词来定义。* throws Exception*/static void testIntervalJoin() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);DataStreamUser users env.fromCollection(userList);Table usersTable tenv.fromDataStream(users, $(id), $(name),$(balance),$(rowtime));DataStreamOrder orders env.fromCollection(orderList);Table ordersTable tenv.fromDataStream(orders, $(id), $(user_id), $(amount),$(rowtime));Table left usersTable.select($(id).as(userId), $(name), $(balance),$(rowtime).as(u_rowtime));Table right ordersTable.select($(id).as(orderId), $(user_id), $(amount),$(rowtime).as(o_rowtime));Table result left.join(right).where(and($(user_id).isEqual($(userId)),$(user_id).isLess(3)
// $(u_rowtime).isGreaterOrEqual($(o_rowtime).minus(lit(5).minutes())),
// $(u_rowtime).isLess($(o_rowtime).plus(lit(10).minutes())))).select($(orderId), $(user_id), $(amount),$(o_rowtime),$(name),$(balance));result.printSchema();DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(result, Row.class);resultDS.print();
// 12 (true,I[1, 1, 18.0, 1698742358391, alan, 18.0])
// 12 (true,I[2, 2, 19.0, 1698742359396, alan, 19.0])
// 12 (true,I[3, 1, 25.0, 1698742360407, alan, 18.0])
// 12 (true,I[5, 1, 29.0, 1698742362424, alan, 18.0])env.execute();}/*** join 表和表函数的结果。左外部表的每一行都会 join 表函数相应调用产生的所有行。 * 如果表函数调用返回空结果则删除左侧外部表的一行。* 该示例为示例性的具体的验证将在自定义函数中进行说明* * throws Exception*/static void testInnerJoinWithUDTF() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);// 注册 User-Defined Table FunctionTableFunctionTuple3String,String,String split new SplitFunction();tenv.registerFunction(split, split);// joinDataStreamOrder orders env.fromCollection(orderList);Table ordersTable tenv.fromDataStream(orders, $(id), $(user_id), $(amount),$(rowtime));Table result ordersTable.joinLateral(call(split, $(c)).as(s, t, v)).select($(a), $(b), $(s), $(t), $(v));env.execute();}/*** join 表和表函数的结果。左外部表的每一行都会 join 表函数相应调用产生的所有行。* 如果表函数调用返回空结果则保留相应的 outer外部连接行并用空值填充右侧结果。* 目前表函数左外连接的谓词只能为空或字面常量真。* 该示例为示例性的具体的验证将在自定义函数中进行说明* * throws Exception*/static void testLeftOuterJoinWithUDTF() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);// 注册 User-Defined Table FunctionTableFunctionTuple3String,String,String split new SplitFunction();tenv.registerFunction(split, split);// joinDataStreamOrder orders env.fromCollection(orderList);Table ordersTable tenv.fromDataStream(orders, $(id), $(user_id), $(amount),$(rowtime));Table result ordersTable.leftOuterJoinLateral(call(split, $(c)).as(s, t, v)).select($(a), $(b), $(s), $(t), $(v));env.execute();}/*** Temporal table 是跟踪随时间变化的表。* Temporal table 函数提供对特定时间点 temporal table 状态的访问。* 表与 temporal table 函数进行 join 的语法和使用表函数进行 inner join 的语法相同。* 目前仅支持与 temporal table 的 inner join。* * throws Exception*/static void testJoinWithTemporalTable() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);Table ratesHistory tenv.from(RatesHistory);// 注册带有时间属性和主键的 temporal table functionTemporalTableFunction rates ratesHistory.createTemporalTableFunction($(r_proctime),$(r_currency));tenv.registerFunction(rates, rates);// 基于时间属性和键与“Orders”表关联Table orders tenv.from(Orders);Table result orders.joinLateral(call(rates, $(o_proctime)), $(o_currency).isEqual($(r_currency)));env.execute();}/*** param args* throws Exception */public static void main(String[] args) throws Exception {
// testInnerJoin();
// testOuterJoin();
// testIntervalJoin();testInnerJoinWithUDTF();}static class SplitFunction extends TableFunctionTuple3String,String,String{public void eval(Tuple3String,String,String tp) {// for (String s : str.split(,)) {
// // use collect(...) to emit a rowcollect(Row.of(s, s.length()));
// }}}
}2、关于时态表的示例
该示例来源于https://developer.aliyun.com/article/679659 假设有一张订单表Orders和一张汇率表Rates那么订单来自于不同的地区所以支付的币种各不一样那么假设需要统计每个订单在下单时候Yen币种对应的金额。
统计需求对应的SQL
SELECT o.currency, o.amount, r.rateo.amount * r.rate AS yen_amount
FROMOrders AS o,LATERAL TABLE (Rates(o.rowtime)) AS r
WHERE r.currency o.currencyWithout connnector 实现代码
object TemporalTableJoinTest {def main(args: Array[String]): Unit {val env StreamExecutionEnvironment.getExecutionEnvironmentval tEnv TableEnvironment.getTableEnvironment(env)env.setParallelism(1)
// 设置时间类型是 event-time env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 构造订单数据val ordersData new mutable.MutableList[(Long, String, Timestamp)]ordersData.((2L, Euro, new Timestamp(2L)))ordersData.((1L, US Dollar, new Timestamp(3L)))ordersData.((50L, Yen, new Timestamp(4L)))ordersData.((3L, Euro, new Timestamp(5L)))//构造汇率数据val ratesHistoryData new mutable.MutableList[(String, Long, Timestamp)]ratesHistoryData.((US Dollar, 102L, new Timestamp(1L)))ratesHistoryData.((Euro, 114L, new Timestamp(1L)))ratesHistoryData.((Yen, 1L, new Timestamp(1L)))ratesHistoryData.((Euro, 116L, new Timestamp(5L)))ratesHistoryData.((Euro, 119L, new Timestamp(7L)))// 进行订单表 event-time 的提取val orders env.fromCollection(ordersData).assignTimestampsAndWatermarks(new OrderTimestampExtractor[Long, String]()).toTable(tEnv, amount, currency, rowtime.rowtime)// 进行汇率表 event-time 的提取val ratesHistory env.fromCollection(ratesHistoryData).assignTimestampsAndWatermarks(new OrderTimestampExtractor[String, Long]()).toTable(tEnv, currency, rate, rowtime.rowtime)// 注册订单表和汇率表tEnv.registerTable(Orders, orders)tEnv.registerTable(RatesHistory, ratesHistory)val tab tEnv.scan(RatesHistory);
// 创建TemporalTableFunctionval temporalTableFunction tab.createTemporalTableFunction(rowtime, currency)
//注册TemporalTableFunction
tEnv.registerFunction(Rates,temporalTableFunction)val SQLQuery |SELECT o.currency, o.amount, r.rate,| o.amount * r.rate AS yen_amount|FROM| Orders AS o,| LATERAL TABLE (Rates(o.rowtime)) AS r|WHERE r.currency o.currency|.stripMargintEnv.registerTable(TemporalJoinResult, tEnv.SQLQuery(SQLQuery))val result tEnv.scan(TemporalJoinResult).toAppendStream[Row]// 打印查询结果result.print()env.execute()}}OrderTimestampExtractor 实现如下
import java.SQL.Timestampimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Timeclass OrderTimestampExtractor[T1, T2]extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) {override def extractTimestamp(element: (T1, T2, Timestamp)): Long {element._3.getTime}
}With CSVConnector 实现代码
在实际的生产开发中都需要实际的Connector的定义下面我们以CSV格式的Connector定义来开发Temporal Table JOIN Demo。
1、genEventRatesHistorySource
def genEventRatesHistorySource: CsvTableSource {val csvRecords Seq(ts#currency#rate,1#US Dollar#102,1#Euro#114,1#Yen#1,3#Euro#116,5#Euro#119,7#Pounds#108)// 测试数据写入临时文件val tempFilePath FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), csv_source_rate, tmp)// 创建Source connectornew CsvTableSource(tempFilePath,Array(ts,currency,rate),Array(Types.LONG,Types.STRING,Types.LONG),fieldDelim #,rowDelim CommonUtils.line,ignoreFirstLine true,ignoreComments %)}2、genRatesOrderSource def genRatesOrderSource: CsvTableSource {val csvRecords Seq(ts#currency#amount,2#Euro#10,4#Euro#10)// 测试数据写入临时文件val tempFilePath FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), csv_source_order, tmp)// 创建Source connectornew CsvTableSource(tempFilePath,Array(ts,currency, amount),Array(Types.LONG,Types.STRING,Types.LONG),fieldDelim #,rowDelim CommonUtils.line,ignoreFirstLine true,ignoreComments %)}3、主程序
import java.io.Fileimport org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.book.utils.{CommonUtils, FileUtils}
import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Rowobject CsvTableSourceUtils {def genWordCountSource: CsvTableSource {val csvRecords Seq(words,Hello Flink,Hi, Apache Flink,Apache FlinkBook)// 测试数据写入临时文件val tempFilePath FileUtils.writeToTempFile(csvRecords.mkString($), csv_source_, tmp)// 创建Source connectornew CsvTableSource(tempFilePath,Array(words),Array(Types.STRING),fieldDelim #,rowDelim $,ignoreFirstLine true,ignoreComments %)}def genRatesHistorySource: CsvTableSource {val csvRecords Seq(rowtime ,currency ,rate,09:00:00 ,US Dollar , 102,09:00:00 ,Euro , 114,09:00:00 ,Yen , 1,10:45:00 ,Euro , 116,11:15:00 ,Euro , 119,11:49:00 ,Pounds , 108)// 测试数据写入临时文件val tempFilePath FileUtils.writeToTempFile(csvRecords.mkString($), csv_source_, tmp)// 创建Source connectornew CsvTableSource(tempFilePath,Array(rowtime,currency,rate),Array(Types.STRING,Types.STRING,Types.STRING),fieldDelim ,,rowDelim $,ignoreFirstLine true,ignoreComments %)}def genEventRatesHistorySource: CsvTableSource {val csvRecords Seq(ts#currency#rate,1#US Dollar#102,1#Euro#114,1#Yen#1,3#Euro#116,5#Euro#119,7#Pounds#108)// 测试数据写入临时文件val tempFilePath FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), csv_source_rate, tmp)// 创建Source connectornew CsvTableSource(tempFilePath,Array(ts,currency,rate),Array(Types.LONG,Types.STRING,Types.LONG),fieldDelim #,rowDelim CommonUtils.line,ignoreFirstLine true,ignoreComments %)}def genRatesOrderSource: CsvTableSource {val csvRecords Seq(ts#currency#amount,2#Euro#10,4#Euro#10)// 测试数据写入临时文件val tempFilePath FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), csv_source_order, tmp)// 创建Source connectornew CsvTableSource(tempFilePath,Array(ts,currency, amount),Array(Types.LONG,Types.STRING,Types.LONG),fieldDelim #,rowDelim CommonUtils.line,ignoreFirstLine true,ignoreComments %)}/*** Example:* genCsvSink(* Array[String](word, count),* Array[TypeInformation[_] ](Types.STRING, Types.LONG))*/def genCsvSink(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] {val tempFile File.createTempFile(csv_sink_, tem)if (tempFile.exists()) {tempFile.delete()}new CsvTableSink(tempFile.getAbsolutePath).configure(fieldNames, fieldTypes)}}4、运行结果
以上通过示例介绍了如何使用table api进行表、视图、窗口函数的操作同时也介绍了table api对表的查询、过滤、列、聚合以及join操作。关于表的set、order by、insert、group window、over window等相关操作详见下篇文章17、Flink 之Table API: Table API 支持的操作2