长沙网站列表,上海黄浦网站建设,自建站 外贸,有动效得网站#x1f9d9;FlinkSQL#x1f3c2;#x1f93a;
Table API 和 SQL 是最上层的 API#xff0c;在 Flink 中这两种 API 被集成在一起#xff0c;SQL 执行的对象也是Flink 中的表#xff08;Table#xff09;#xff0c;所以我们一般会认为它们是一体的。
SQL API 是基于…
FlinkSQL
Table API 和 SQL 是最上层的 API在 Flink 中这两种 API 被集成在一起SQL 执行的对象也是Flink 中的表Table所以我们一般会认为它们是一体的。
SQL API 是基于 SQL 标准的 Apache Calcite 框架实现的可通过纯 SQL 来开发和运行一个 Flink 任务。 SQL 解析和验证Calcite 提供 SQL 解析和验证功能可以将 SQL 查询语句解析成抽象语法树AST并进行语法验证、类型检查等操作。 sql-client 准备
原神启动
启动hadoop
启动flink %FLINK_HOME%/bin/yarn-session.sh -d 启动Flink 的 sql-client %FLINK_HOME%/bin/sql-client.sh embedded -s yarn-session embedded这是 SQL 客户端的模式之一。embedded 模式表示 SQL 客户端将在同一进程中运行通常用于本地开发和测试。 -s yarn-session这部分指定了要连接的 Flink 集群模式。在这里yarn-session 表示你想要连接到一个运行在 Apache YARN 上的 Flink 会话集群。这样SQL 客户端将连接到远程的 Flink 集群而不是本地 Flink 集群。 (base) [roothadoop1 flink-1.17.0]# bin/sql-client.sh embedded -s yarn-session
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/export/server/flink-1.17.0/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/export/server/hadoop-3.3.3/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
2023-10-17 10:22:10,471 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2023-10-17 10:22:10,471 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-root.▒▓██▓██▒▓████▒▒█▓▒▓███▓▒▓███▓░░ ▒▒▒▓██▒ ▒░██▒ ▒▒▓▓█▓▓▒░ ▒██████▒ ░▒▓███▒ ▒█▒█▒░▓█ ███ ▓░▒██▓█ ▒▒▒▒▒▓██▓░▒░▓▓██░ █ ▒▒░ ███▓▓█ ▒█▒▒▒████░ ▒▓█▓ ██▒▒▒ ▓███▒░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓███▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓█▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒▓█ ▒█▓ ░ █░ ▒█ █▓█▓ ██ █░ ▓▓ ▒█▓▓▓▒█░█▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒███ ▓█▓░ ▒ ░▒█▒██▒ ▓▓▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓░▓██▒ ▓░ ▒█▓█ ░░▒▒▒▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | _ \| |/ / \___ \| | | | | | | | | |/ _ \ _ \| __|| | | | | | | | ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|Welcome! Enter HELP; to list all available commands. QUIT; to exit.Command history file path: /root/.flink-sql-historyFlink SQL 配置
1结果显示模式 #默认table还可以设置为tableau、changelog SET sql-client.execution.result-modetableau; 3执行环境 SET execution.runtime-modestreaming; #默认 streaming也可以设置 batch 4默认并行度 SET parallelism.default1; 5设置状态TTL SET table.exec.state.ttl1000; 6通过sql 文件初始化 1创建sql 文件 vim conf/sql-client-init.sql SET sql-client.execution.result-modetableau; CREATE DATABASE mydatabase; 2启动时指定 sql 文件 /opt/module/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session -i conf/sql-client-init.sql 流处理中的表
可以看到其实关系型表和 SQL主要就是针对批处理设计的这和流处理有着天生的隔阂。接下来我们就来深入探讨一下流处理中表的概念。 动态表和持续查询
1动态表Dynamic Tables 我们所熟悉的表一般用来做批处理面向的是固定的数据集可以认为是“静态表”而动态表则完全不同它里面的数据会随时间变化。 当流中有新数据到来初始的表中会插入一行而基于这个表定义的 SQL 查询就应该在之前的基础上更新结果。这样得到的表就会不断地动态变化被称为“动态表”Dynamic Tables。
2持续查询Continuous Query 动态表可以像静态的批处理表一样进行查询操作。由于数据在不断变化因此基于它定义的 SQL 查询也不可能执行一次就得到最终结果。这样一来我们对动态表的查询也就永远不会停止一直在随着新数据的到来而继续执行。 这样的查询就被称作“持续查询”Continuous Query。对动态表定义的查询操作都是持续查询而持续查询的结果也会是一个动态表。 由于每次数据到来都会触发查询操作因此可以认为一次查询面对的数据集就是当前输入动态表中收到的所有数据。这相当于是对输入动态表做了一个“快照”snapshot当作有限数据集进行批处理流式数据的到来会触发连续不断的快照查询像动画一样连贯起来就构成了“持续查询”。 将流转换成动态表 如果把流看作一张表那么流中每个数据的到来都应该看作是对表的一次插入Insert操作会在表的末尾添加一行数据。因为流是连续不断的而且之前的输出结果无法改变、只能在后面追加所以我们其实是通过一个只有插入操作insert-only的更新日志changelog流来构建一个表。 例如当用户点击事件到来时就对应着动态表中的一次插入Insert操作每条数据就是表中的一行随着插入更多的点击事件得到的动态表将不断增长。 用 SQL 持续查询
1更新Update查询 我们在代码中定义了一个SQL 查询。 Table urlCountTable tableEnv.sqlQuery(SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user); 当原始动态表不停地插入新的数据时查询得到的 urlCountTable 会持续地进行更改。由于 count 数量可能会叠加增长因此这里的更改操作可以是简单的插入Insert也可以是对之前数据的更新Update。这种持续查询被称为更新查询Update Query更新查询得到的结果表如果想要转换成DataStream必须调用 toChangelogStream()方法。 2追加Append查询 上面的例子中查询过程用到了分组聚合结果表中就会产生更新操作。如果我们执行一个简单的条件查询结果表中就会像原始表 EventTable 一样只有插入Insert操作了 Table aliceVisitTable tableEnv.sqlQuery(SELECT url, user FROM EventTable WHERE user Cary); 结果表result 如果转换成DataStream可以直接调用toDataStream()方法。 将动态表转换为流 与关系型数据库中的表一样动态表也可以通过插入Insert、更新Update和删除Delete操作进行持续的更改。将动态表转换为流或将其写入外部系统时就需要对这些更改操作进行编码通过发送编码消息的方式告诉外部系统要执行的操作。在 Flink 中 Table API 和 SQL 支持三种编码方式 ⚫ 仅追加Append-only流 仅通过插入Insert更改来修改的动态表可以直接转换为“仅追加”流。这个流中发出的数据其实就是动态表中新增的每一行。 ⚫ 撤回Retract流 撤回流是包含两类消息的流添加add消息和撤回retract消息。 具体的编码规则是INSERT 插入操作编码为 add 消息DELETE 删除操作编码为 retract消息而 UPDATE 更新操作则编码为被更改行的 retract 消息和更新后行新行的 add 消息。这样我们可以通过编码后的消息指明所有的增删改操作一个动态表就可以转换为撤回流了。 ⚫ 更新插入Upsert流 更新插入流中只包含两种类型的消息更新插入upsert消息和删除delete消息。 所谓的“upsert”其实是“update”和“insert”的合成词所以对于更新插入流来说INSERT插入操作和UPDATE 更新操作统一被编码为 upsert 消息而 DELETE 删除操作则被编码为delete 消息。 时间属性
事件时间属性可以在创建表 DDL 中定义增加一个字段通过 WATERMARK 语句来定义事件时间属性。具体定义方式如下
CREATE TABLE EventTable( user STRING, url STRING, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL 5 SECOND
) WITH ( ...
);
这里我们把ts 字段定义为事件时间属性而且基于ts 设置了5 秒的水位线延迟。
时间戳类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型。但是时间戳一般都是秒或者是毫秒BIGINT 类型这种情况可以通过如下方式转换 ts BIGINT, time_ltz AS TO_TIMESTAMP_LTZ(ts, 3), 在定义处理时间属性时必须要额外声明一个字段专门用来保存当前的处理时间。 在创建表的DDLCREATE TABLE 语句中可以增加一个额外的字段通过调用系统内置的PROCTIME()函数来指定当前的处理时间属性。
CREATE TABLE EventTable( user STRING, url STRING, ts AS PROCTIME()
) WITH ( ...
);
DDL
MySQL上_Int mian[]的博客-CSDN博客
MySQL下_Int mian[]的博客-CSDN博客
查询
DataGen Print
1创建数据生成器源表
BIRT小问题记录1--发生 BIRT 例外Encountered: \u00a0 (160), after : .-CSDN博客
CREATE TABLE source ( id INT, ts BIGINT, vc INT
) WITH ( connector datagen, rows-per-second1, fields.id.kindrandom, fields.id.min1, fields.id.max10, fields.ts.kindsequence, fields.ts.start1, fields.ts.end1000000, fields.vc.kindrandom, fields.vc.min1, fields.vc.max100
); CREATE TABLE sink ( id INT, ts BIGINT, vc INT
) WITH (
connector print
);
2查询源表
select * from source
3插入sink 表并查询
INSERT INTO sink select * from source;
select * from sink;
With 子句 WITH 提供了一种编写辅助语句的方法以便在较大的查询中使用。这些语句通常被称为公共表表达式(Common Table Expression, CTE)可以认为它们定义了仅为一个查询而存在的临时视图。
1语法 上一半没有; WITH source_with_total AS ( SELECT id, vc10 AS total FROM source ) SELECT id, SUM(total) FROM source_with_total GROUP BY id; SELECT WHERE 子句 SELECT * FROM source SELECT id, vc 10 FROM source -- 自定义 Source 的数据 SELECT id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price) SELECT vc 10 FROM source WHERE id 10 用作根据 key 进行数据去重 SELECT DISTINCT vc FROM source 对于流查询计算查询结果所需的状态可能无限增长。状态大小取决于不同行数。可以设置适当的状态生存时间(TTL)的查询配置以防止状态过大。但是这可能会影响查询结果的正确性。如某个 key 的数据过期从状态中删除了那么下次再来这么一个 key由于在状态中找不到就又会输出一遍。
窗口表值函数TVF聚合
从 1.13 版本开始分组窗口聚合已经标记为过时鼓励使用更强大、更有效的窗口 TVF聚合 FROM TABLE( 窗口类型 (TABLE 表名, DESCRIPTOR(时间字段),INTERVAL 时间…) ) GROUP BY [window_start,][window_end,] --可选 1滚动窗口
SELECT window_start, window_end, id , SUM(vc) sumVC
FROM TABLE( TUMBLE(TABLE ws, DESCRIPTOR(et), INTERVAL 5 SECONDS))
GROUP BY window_start, window_end, id;
2 滑动窗口 要求 窗口长度滑动步长的整数倍底层会优化成多个小滚动窗口 SELECT window_start, window_end, id , SUM(vc) sumVC FROM TABLE( HOP(TABLE ws, DESCRIPTOR(et), INTERVAL 5 SECONDS , INTERVAL 10 SECONDS)) GROUP BY window_start, window_end, id; 3累积窗口 累积窗口可以认为是首先开一个最大窗口大小的滚动窗口然后根据用户设置的触发的时间间隔将这个滚动窗口拆分为多个窗口这些窗口具有相同的窗口起点和不同的窗口终点。
SELECT window_start, window_end, id , SUM(vc) sumVC
FROM TABLE( CUMULATE(TABLE ws, DESCRIPTOR(et), INTERVAL 2 SECONDS , INTERVAL 6 SECONDS))
GROUP BY window_start, window_end, id;
SQL Hints 临时属性
在执行查询时可以在表名后面添加 SQL Hints 来临时修改表属性对当前 job 生效。 select * from ws1/* OPTIONS(rows-per-second10)*/; 系统函数
System (Built-in) Functions | Apache Flink Flink SQL 提供了大量的系统函数几乎支持所有的标准SQL 中的操作这为我们使用SQL 编写流处理程序提供了极大的方便。
Module 操作
Module 允许 Flink 扩展函数能力。它是可插拔的Flink 官方本身已经提供了一些 Module用户也可以编写自己的 Module。
目前 Flink 包含了以下三种 Module
➢ CoreModuleCoreModule 是 Flink 内置的 Module其包含了目前 Flink 内置的所有 UDFFlink 默认开启的 Module 就是 CoreModule我们可以直接使用其中的 UDF
➢ HiveModuleHiveModule 可以将 Hive 内置函数作为 Flink 的系统函数提供给 SQL\Table API 用户进行使用比如 get_json_object 这类 Hive 内置函数Flink 默认的 CoreModule 是没有的
➢ 用户自定义 Module用户可以实现 Module 接口实现自己的 UDF 扩展 Module -- 加载 LOAD MODULE module_name [WITH (key1 val1, key2 val2, ...)] -- 卸载 UNLOAD MODULE module_name -- 查看 SHOW MODULES; SHOW FULL MODULES; 常用Connector
DataGen 和 Print 都是一种 connector其他connector 参考官网
Overview | Apache Flink
Kafka
1添加kafka 连接器依赖 1将 flink-sql-connector-kafka-1.17.0.jar 上传到flink 的 lib 目录下 2重启yarn-session、sql-client 2普通Kafka 表
CREATE TABLE t1( event_time TIMESTAMP(3) METADATA FROM timestamp, --列名和元数据名一致可以省略 FROM xxxx, VIRTUAL 表示只读 partition BIGINT METADATA VIRTUAL, offset BIGINT METADATA VIRTUAL,
id int,
ts bigint ,
vc int )
WITH ( connector kafka, properties.bootstrap.servers hadoop103:9092, properties.group.id atguigu,
-- earliest-offset, latest-offset, group-offsets, timestamp
and specific-offsets scan.startup.mode earliest-offset, -- fixed 为flink 实现的分区器一个并行度只写往kafka 一个分区
sink.partitioner fixed, topic ws1, format json
)
2插入Kafka 表 insert into t1(id,ts,vc) select * from source 3查询Kafka 表 select * from t1 如果当前表存在更新操作那么普通的 kafka 连接器将无法满足此时可以使用 Upsert Kafka 连接器。
connector upsert-kafka,
File
1创建FileSystem 映射表
CREATE TABLE t3( id int, ts bigint , vc int )
WITH ( connector filesystem, path hdfs://hadoop102:8020/data/t3, format csv
) 使用savepoint
1提交一个insert 作业可以给作业设置名称 INSERT INTO sink select * from source; 2查看job 列表 SHOW JOBS; 3停止作业触发 savepoint
SET state.checkpoints.dirhdfs://hadoop102:8020/chk;
SET state.savepoints.dirhdfs://hadoop102:8020/sp;
STOP JOB 228d70913eab60dda85c5e7f78b5782c WITH SAVEPOINT;
4从savepoint 恢复
-- 设置从savepoint 恢复的路径
SET execution.savepoint.pathhdfs://hadoop102:8020/sp/savepoint-37f5e6-0013a2874f0a; -- 之后直接提交 sql就会从savepoint 恢复 --允许跳过无法还原的保存点状态
set execution.savepoint.ignore-unclaimed-state true;
5恢复后重置路径 指定execution.savepoint.path 后将影响后面执行的所有DML 语句可以使用RESET 命 令重置这个配置选项。 RESET execution.savepoint.path; Catalog
Catalog 提供了元数据信息例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。
Catalog 类型 目前 Flink 包含了以下四种 Catalog ➢ GenericInMemoryCatalog基于内存实现的 Catalog所有元数据只在 session 的生命周期即一个 Flink 任务一次运行生命周期内内可用。默认自动创建会有名为“default_catalog”的内存Catalog这个Catalog默认只有一个名为“default_database”的数据库。 ➢ JdbcCatalogJdbcCatalog 使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。 Postgres Catalog 和MySQL Catalog 是目前仅有的两种 JDBC Catalog 实现将元数据 存储在数据库中。 ➢ HiveCatalog有两个用途一是单纯作为 Flink 元数据的持久化存储二是作为读 写现有 Hive 元数据的接口。注意Hive MetaStore 以小写形式存储所有元数据对象 名称。Hive Metastore 以小写形式存储所有元对象名称而 GenericInMemoryCatalog 会区分大小写。 JdbcCatalogMySQL
JdbcCatalog不支持建表只是打通flink与mysql的连接可以去读写mysql现有的库表。
上传所需 flink-connector-jdbc jar 包到 lib 下
重启flink 集群和 sql-client
创建Catalog JdbcCatalog 支持以下选项: ➢ name:必需Catalog 名称。 ➢ default-database:必需连接到的默认数据库。 ➢ username: 必需Postgres/MySQL 帐户的用户名。 ➢ password:必需该帐号的密码。 ➢ base-url:必需数据库的 jdbc url(不包含数据库名) 对于 Postgres Catalog是jdbc:postgresql://ip:端口 对于 MySQL Catalog是jdbc: mysql://ip:端口 CREATE CATALOG my_jdbc_catalog WITH( type jdbc, default-database test, username root, password 000000, base-url jdbc:mysql://hadoop102:3306
); SHOW CATALOGS; --查看当前的CATALOG SHOW CURRENT CATALOG; USE CATALOG my_jdbc_catalog; --查看当前的CATALOG SHOW CURRENT CATALOG; HiveCatalog
1上传所需flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar 包到 lib 下
2更换planner 依赖 只有在使用Hive 方言或HiveServer2 时才需要这样额外的计划器jar 移动但这是Hive 集成的推荐设置。
mv \
/opt/module/flink-1.17.0/lib/flink-table-planner-loader-1.17.0.jar \
/opt/module/flink-1.17.0/opt/flink-table-planner-loader-1.17.0.jar
3重启flink 集群和 sql-client 4启动外置的hive metastore 服务 Hive metastore 必须作为独立服务运行也就是hive-site 中必须配置 hive.metastore.uris hive --service metastore 创建Catalog CREATE CATALOG myhive WITH ( type hive, default-database default, hive-conf-dir /opt/module/hive/conf
);
4查看Catalog
SHOW CATALOGS; --查看当前的CATALOG SHOW CURRENT CATALOG; 5使用指定Catalog
USE CATALOG myhive; --查看当前的CATALOG SHOW CURRENT CATALOG;
JAVA代码中使用SQL
TableEnv
这里的依赖是一个 Java 的“桥接器”bridge主要就是负责 Table API 和下层DataStream API 的连接支持按照不同的语言分为 Java 版和 Scala 版。
dependency groupIdorg.apache.flink/groupId artifactIdflink-table-api-java-bridge/artifactId version${flink.version}/version
/dependency 如果我们希望在本地的集成开发环境IDE里运行 Table API 和 SQL还需要引入以下依赖
dependency groupIdorg.apache.flink/groupId artifactIdflink-table-planner-loader/artifactId version${flink.version}/version
/dependency dependency groupIdorg.apache.flink/groupId artifactIdflink-table-runtime/artifactId version${flink.version}/version
/dependency dependency groupIdorg.apache.flink/groupId artifactIdflink-connector-files/artifactId version${flink.version}/version
/dependency
对于 Flink 这样的流处理框架来说数据流和表在结构上还是有所区别的。所以使用Table API 和 SQL 需要一个特别的运行时环境这就是所谓的“表环境”TableEnvironment。
它主要负责
1注册Catalog 和表 2执行 SQL 查询 3注册用户自定义函数UDF 4DataStream 和表之间的转换。 每个表和 SQL 的执行都必须绑定在一个表环境TableEnvironment中。TableEnvironment 是 Table API 中提供的基本接口类可以通过调用静态的 create()方法来创建一个表环境实例。方法需要传入一个环境的配置参数 EnvironmentSettings它可以指定当前表环境的执行模式和计划器planner。执行模式有批处理和流处理两种选择默认是流处理模式计划器默认使用 blink planner。
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment; EnvironmentSettings settings EnvironmentSettings .newInstance() .inStreamingMode() // 使用流处理模式 .build(); TableEnvironment tableEnv TableEnvironment.create(setting);
对于流处理场景其实默认配置就完全够用了。所以我们也可以用另一种更加简单的方式来创建表环境
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; StreamExecutionEnvironment env
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);
创建表
具体创建表的方式有通过连接器connector和虚拟表virtual tables两种。
1连接器表Connector Tables 最直观的创建表的方式就是通过连接器connector连接到一个外部系统然后定义出对应的表结构。
tableEnv.executeSql(CREATE [TEMPORARY] TABLE MyTable ... WITH
( connector ... ));
2虚拟表Virtual Tables
在环境中注册之后我们就可以在 SQL 中直接使用这张表进行查询转换了。 Table newTable tableEnv.sqlQuery(SELECT ... FROM MyTable... ); 这里调用了表环境的 sqlQuery()方法直接传入一条 SQL 语句作为参数执行查询得到的结果是一个Table 对象。Table 是Table API 中提供的核心接口类就代表了一个Java 中定义的表实例。 由于 newTable 是一个 Table 对象并没有在表环境中注册所以如果希望直接在 SQL 中使用我们还需要将这个中间结果表注册到环境中 tableEnv.createTemporaryView(NewTable, newTable); 其实是创建了一个“虚拟表”Virtual Table。这个概念与 SQL 语法中的视图 View 非常类似
表的查询
创建好了表接下来自然就是对表进行查询转换了。对一个表的查询Query操作就对应着流数据的转换Transform处理。
Flink 为我们提供了两种查询方式SQL和Table API。
// 查询用户Alice 的点击事件并提取表中前两个字段
Table aliceVisitTable tableEnv.sqlQuery( SELECT user, url FROM EventTable WHERE user Alice );
我们也可以直接将查询的结果写入到已经注册的表中这需要调用表环境的executeSql()方法来执行DDL传入的是一个 INSERT 语句
// 注册表
tableEnv.executeSql(CREATE TABLE EventTable ... WITH ( connector ... ));
tableEnv.executeSql(CREATE TABLE OutputTable ... WITH ( connector ... )); // 将查询结果输出到 OutputTable 中
tableEnv.executeSql (
INSERT INTO OutputTable SELECT user, url FROM EventTable WHERE user Alice );
另外一种查询方式就是调用 Table API。这是嵌入在 Java 和 Scala 语言内的查询 API核心就是 Table 接口类通过一步步链式调用 Table 的方法就可以定义出所有的查询转换操作。
由于 Table API 是基于 Table 的 Java 实例进行调用的因此我们首先要得到表的 Java 对象。基于环境中已注册的表可以通过表环境的from()方法非常容易地得到一个Table 对象 Table eventTable tableEnv.from(EventTable); EventTable 是在环境中注册的表名。得到Table 对象之后就可以调用API 进行各种转换操作了得到的是一个新的Table 对象 Table maryClickTable eventTable .where($(user).isEqual(Alice)) .select($(url), $(user)); “$”符号用来指定表中的一个字段。 输出表
在代码上输出一张表最直接的方法就是调用 Table 的方法 executeInsert()方法将一个 Table 写入到注册过的表中方法传入的参数就是注册的表名
// 注册表用于输出数据到外部系统
tableEnv.executeSql(CREATE TABLE OutputTable ... WITH ( connector ... )); // 经过查询转换得到结果表
Table result ... // 将结果表写入已注册的输出表中
result.executeInsert(OutputTable); // TODO 4.输出表 // 4.1 sql 用法 // tableEnv.executeSql(insert into sink select * from tmp); // 4.2 tableapi 用法 result.executeInsert(sink); }
表和流的转换 想要将一个DataStream转换成表很简单可以通过调用表环境的fromDataStream()方法来实现返回的就是一个Table 对象。
1将流DataStream转换成表Table
1调用fromDataStream()方法
// 获取表环境
StreamTableEnvironment tableEnv StreamTableEnvironment.create(env)// 读取数据源
SingleOutputStreamOperatorWaterSensor sensorDS
env.fromSource(...) // 将数据流转换成表
Table sensorTable tableEnv.fromDataStream(sensorDS); 由于流中的数据本身就是定义好的 POJO 类型 WaterSensor所以我们将流转换成表之后一行数据就对应着一个WaterSensor而表中的列名就对应着 WaterSensor 中的属性。 另外我们还可以在fromDataStream()方法中增加参数用来指定提取哪些属性作为表中的字段名并可以任意指定位置 // 提取Event 中的 timestamp 和url 作为表中的列 Table sensorTable tableEnv.fromDataStream(sensorDS, $(id), $(vc)); 也可以通过表达式的 as()方法对字段进行重命名 // 将timestamp 字段重命名为ts Table sensorTable tableEnv.fromDataStream(sensorDS, $(id).as(sid), $(vc)); 2调用createTemporaryView()方法 调用 fromDataStream()方法简单直观可以直接实现 DataStream 到 Table 的转换不过如果我们希望直接在 SQL 中引用这张表就还需要调用表环境的 createTemporaryView()方法来创建虚拟视图了。 对于这种场景也有一种更简洁的调用方式。我们可以直接调用 createTemporaryView()方法创建虚拟表传入的两个参数第一个依然是注册的表名而第二个可以直接就是DataStream。之后仍旧可以传入多个参数用来指定表中的字段 tableEnv.createTemporaryView(sensorTable,sensorDS, $(id),$(ts),$(vc)); 2将表Table转换成流DataStream
1调用toDataStream()方法 将一个Table对象转换成DataStream非常简单只要直接调用表环境的方法toDataStream()就可以了。例如我们可以将 2.4 小节经查询转换得到的表 aliceClickTable 转换成流打印输出 tableEnv.toDataStream(table).print(); 2调用toChangelogStream()方法
Table table tableEnv.sqlQuery( SELECT id, sum(vc) FROM source GROUP BY id ); // 将 表 转 换 成 更 新 日 志 流
tableEnv.toChangelogStream(table).print();
自定义函数UDF
⚫ 标量函数Scalar Functions将输入的标量值转换成一个新的标量值
⚫ 表函数Table Functions将标量值转换成一个或多个新的行数据也就是扩展成一个表
⚫ 聚合函数Aggregate Functions将多行数据里的标量值转换成一个新的标量值
⚫ 表聚合函数Table Aggregate Functions将多行数据里的标量值转换成一个或多个新的行数据。
要想在代码中使用自定义的函数我们需要首先自定义对应 UDF 抽象类的实现并在表环境中注册这个函数然后就可以在Table API 和 SQL 中调用了。
1注册函数
注册函数时需要调用表环境的 createTemporarySystemFunction()方法传入注册的函数名以及UDF 类的Class 对象
// 注册函数
tableEnv.createTemporarySystemFunction(MyFunction, MyFunction.class);
2使用Table API 调用函数
在 Table API 中需要使用 call()方法来调用自定义函数 tableEnv.from(MyTable).select(call(MyFunction, $(my参数))); 3在 SQL 中调用函数 当我们将函数注册为系统函数之后在 SQL 中的调用就与内置系统函数完全一样了 tableEnv.sqlQuery(SELECT MyFunction(myField) FROM MyTable); 标量函数Scalar Functions 自定义标量函数可以把 0 个、 1 个或多个标量值转换成一个标量值它对应的输入是一行数据中的字段输出则是唯一的值。所以从输入和输出表中行数据的对应关系看标量函数是“一对一”的转换。 想要实现自定义的标量函数我们需要自定义一个类来继承抽象类 ScalarFunction并实现叫作 eval() 的求值方法。标量函数的行为就取决于求值方法的定义它必须是公有的public而且名字必须是 eval。求值方法 eval 可以重载多次任何数据类型都可作为求值方法的参数和返回值类型。 这里需要特别说明的是ScalarFunction 抽象类中并没有定义 eval()方法所以我们不能直接在代码中重写override但Table API的框架底层又要求了求值方法必须名字为eval()。 这是Table API 和 SQL 目前还显得不够完善的地方未来的版本应该会有所改进。 package com.yuange;import com.yuange.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;public class Main {public static void main(String[] args) throws Exception {// 环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);DataStreamSourceWaterSensor sensorDS env.fromElements(new WaterSensor(s1, 1L, 1),new WaterSensor(s1, 2L, 2),new WaterSensor(s2, 2L, 2),new WaterSensor(s3, 3L, 3),new WaterSensor(s3, 4L, 4));Table sensorTable tableEnv.fromDataStream(sensorDS);tableEnv.createTemporaryView(sensor, sensorTable);// TODO 2.注册函数tableEnv.createTemporaryFunction(HashFunction, HashFunction.class);// TODO 3.调用 自定义函数// 3.1 sql 用法// tableEnv.sqlQuery(select HashFunction(id) from sensor)// .execute() // 调用了 sql 的 execute就不需要
// .print();// 3.2 table api 用法sensorTable.select(call(HashFunction,$(id))).execute().print();
// env.execute();}// TODO 1.定义 自定义函数的实现类public static class HashFunction extends ScalarFunction {// 接受任意类型的输入返回 INT 型输出public int eval(DataTypeHint(inputGroup InputGroup.ANY) Object o) {return o.hashCode();}}
}
聚合函数Aggregate Functions
package com.yuange;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;import static org.apache.flink.table.api.Expressions.$;public class Main {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 姓名分数权重 DataStreamSourceTuple3String,Integer, IntegerscoreWeightDS env.fromElements(Tuple3.of(zs,80, 3),Tuple3.of(zs,90, 4),Tuple3.of(zs,95, 4),Tuple3.of(ls,75, 4),Tuple3.of(ls,65, 4),Tuple3.of(ls,85, 4));StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);Table scoreWeightTable tableEnv.fromDataStream(scoreWeightDS,$(f0).as(name),$(f1).as(score), $(f2).as(weight));tableEnv.createTemporaryView(scores, scoreWeightTable);// TODO 2.注册函数 tableEnv.createTemporaryFunction(WeightedAvg, WeightedAvg.class);// TODO 3.调用 自定义函数 tableEnv.sqlQuery(select name,WeightedAvg(score,weight) from scores group by name) .execute().print();}// TODO 1.继承 AggregateFunction 返回类型累加器类型加权总和权重总和 public static class WeightedAvg extends AggregateFunctionDouble, Tuple2Integer, Integer {Overridepublic Double getValue(Tuple2Integer, Integer integerIntegerTuple2) {return integerIntegerTuple2.f0 * 1D / integerIntegerTuple2.f1;}Overridepublic Tuple2Integer, Integer createAccumulator() {return Tuple2.of(0, 0); }/*** 累加计算的方法每来一行数据都会调用一次 * param acc 累加器类型 * param score 第一个参数分数 * param weight 第二个参数权重 */public void accumulate(Tuple2Integer, Integer acc,Integer score,Integer weight){acc.f0 score * weight; // 加权总和 分数1 * 权重1 分数2 * 权重2 ....acc.f1 weight; // 权重和 权重 1 权重2 .... }}}