做网站怎样套用模板,中小企业网站制作过程中要注意什么,忘记wordpress登录密码,品牌理念设计企业网站建设更多Paimon数据湖内容请关注#xff1a;https://edu.51cto.com/course/35051.html
虽然前面我们已经讲过如何查询Paimon表中的数据了#xff0c;但是有一些细节的东西还需要详细分析一下。 首先是针对Paimon中系统表的查询#xff0c;例如snapshots\schemas\options等等这些…更多Paimon数据湖内容请关注https://edu.51cto.com/course/35051.html
虽然前面我们已经讲过如何查询Paimon表中的数据了但是有一些细节的东西还需要详细分析一下。 首先是针对Paimon中系统表的查询例如snapshots\schemas\options等等这些系统表。 其实简单理解就是我们可以通过sql的形式查询系统表来查看实体表的快照、schema等信息这些信息我们也可以直接到hdfs中查看只是不太方便。 在查询数据的时候可以细分为批量读取和流式读取因为Paimon可以同时支持批处理和流处理。 在查询数据的时候如果想要从之前的某一个时间点开始查询数据也就说任务启动的时候想要查询一些历史数据则需要用到时间旅行这个特性可以在SQL查询语句中通过动态表选项指定scan.mode参数来控制具体查询哪些历史数据。
Scan Mode的值可以有多种不同的值代表不同的含义下面我们来具体分析一下
注意在分析的时候我们需要针对批处理和流处理这两种情况分别进行分析。 1default如果我们在执行查询的时候没有指定scan.mode参数则默认是default。但是此时需要注意如果我们也没有同时指定其他参数例如timestamp-millis\snapshot-id等scan相关的参数那么默认会执行latest-full策略。 所以说我们在执行查询的时候如果没有指定任何scan相关的参数那么默认执行的策略就是latest-full。 2latest-full和full是一样的效果不过full这个参数已经被标记为过期了。针对批处理表示只读取最新快照中的所有数据读取完成以后任务就执行结束了。针对流处理表示第一次启动时读取最新快照中的所有数据然后继续读取后续新增的变更数据这个任务会一直运行。 3latest针对批处理他的执行效果和latest-full是一样的只会读取最新快照中的所有数据。但是针对流处理就不一样了此时表示只读取最新的变更数据也就是说任务启动之后只读取新增的数据之前的历史快照中的数据不读取。类似于kafka中消费者里面的latest消费策略。 4from-snapshot使用此策略的时候需要同时指定snapshot-id参数。针对批处理表示只读取指定id的快照中的所有数据。针对流处理表示从指定id的快照开始读取变更数据注意此时不是读取这个快照中的所有数据而是读取此快照中的变更数据也可以理解为这个快照和上一个快照相比新增的数据当然后续新增的变更数据也是可以读取到的因为这个是流处理他会一直执行读取操作。 5from-snapshot-full使用此策略的时候也需要同时指定snapshot-id参数。针对批处理他的执行效果和from-snapshot是一样的。针对流处理表示第一次启动时读取指定id的快照中的所有数据然后继续读取后续新增的变更数据此时任务会一直执行。 6from-timestamp使用此策略的时候需要同时指定timestamp-millis参数。针对批处理表示只读取指定时间戳的快照中的所有数据。针对流处理表示从指定时间戳的快照开始读取变更数据注意这里也是读取这个快照中的变更数据不是所有数据。然后读取后续新增的变更数据。 7incremental表示是增量查询这个主要是针对批处理的通过这种策略可以读取开始和结束快照之间的增量变化。开始和结束快照可以通过快照id或者是时间戳进行指定。 如果是使用快照id则需要通过incremental-between参数指定。 如果是使用时间戳则需要通过incremental-between-timestamp参数指定。 8compacted-full想要使用这个参数有一个前提Paimon表需要开启完全压缩(full compaction)。此时针对批处理表示只读取最新完全压缩(full compaction)的快照中的所有数据。针对流处理表示第一次启动时读取最新完全压缩(full compaction)的快照中的所有数据然后继续读取后续新增的变更数据。
针对这里面的latest、latest-full、compacted-full这几种策略放在一起可能容易混淆下面我们来通过一个图重新梳理一下
首先看中间这条线表示是数据的时间轴左边是历史数据右边是最新产生的数据。
中间这条线上面是批处理下面是流处理。
我们首先来看批处理 如果我们指定了scan.mode为latest-full或者是latest则会读取最新的快照中的所有数据也就是Last Snapshot中的数据。 如果我们指定了scan.mode为compacted-full则会读取最新的完全压缩(full compaction)的快照中的数据也就是Last Compact Snapshot中的数据。
接下来看一下流处理 如果我们指定了scan.mode为latest-full则会在任务第一次启动时读取最新快照中的所有数据然后继续读取后续新增的变更数据。也就是第一次启动时先读取Last Snapshot中的所有数据接着读取后续新产生的数据。 如果我们指定了scan.mode为latest则此时只读取最新的变更数据不读取LastSnapshot快照中的数据。 如果我们指定了scan.mode为compacted-full则第一次启动时会读取最新完全压缩(full compaction)的快照中的所有数据也就是Last Compact Snapshot中的数据接着读取后续新产生的数据。
这就是这些策略在批处理和流处理中的执行流程。
1查询系统表
下面我们来通过具体的案例来演示一下前面提到的查询数据相关的用法。
首先创建一个向Paimon表中模拟写入数据的类便于一会测试使用 创建packagetech.xuwei.paimon.query
创建objectFlinkSQLWriteToPaimon
代码如下
package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 通过FlinkSQL 向Paimon中模拟写入数据* Created by xuwei*/
object FlinkSQLWriteToPaimon {def main(args: Array[String]): Unit {//获取执行环境val env StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql(|CREATE CATALOG paimon_catalog WITH (| typepaimon,| warehousehdfs://bigdata01:9000/paimon|)|.stripMargin)tEnv.executeSql(USE CATALOG paimon_catalog)//创建Paimon类型的表tEnv.executeSql(|CREATE TABLE IF NOT EXISTS query_table(| name STRING,| age INT,| PRIMARY KEY (name) NOT ENFORCED|)|.stripMargin)//写入数据tEnv.executeSql(INSERT INTO query_table(name,age) VALUES(jack,18))tEnv.executeSql(INSERT INTO query_table(name,age) VALUES(tom,19))tEnv.executeSql(INSERT INTO query_table(name,age) VALUES(mick,20))}}在idea中运行这个代码。
接下来创建一个类来查询一下Paimon中的系统表。
创建objectFlinkPaimonSystemTable
代码如下
package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 查询Paimon中的系统表* Created by xuwei*/
object FlinkPaimonSystemTable {def main(args: Array[String]): Unit {//获取执行环境val env StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式val tEnv StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql(|CREATE CATALOG paimon_catalog WITH (| typepaimon,| warehousehdfs://bigdata01:9000/paimon|)|.stripMargin)tEnv.executeSql(USE CATALOG paimon_catalog)//snapshot信息表对应的其实就是hdfs中表的snapshot目录下的snapshot-*文件信息println(snapshot信息表)tEnv.executeSql(SELECT * FROM query_table$snapshots).print()//schema信息表对应的其实就是hdfs中表的schema目录下的schema-*文件信息println(schema信息表)tEnv.executeSql(SELECT * FROM query_table$schemas).print()//manifest信息表对应的其实就是hdfs中表的manifest目录下的manifest-*文件信息println(manifest信息表)tEnv.executeSql(SELECT * FROM query_table$manifests).print()//file信息表对应的其实就是hdfs中表的bucket-*目录下的data-*文件信息println(file信息表)tEnv.executeSql(SELECT * FROM query_table$files).print()//option信息表对应的就是建表语句中with里面指定的参数信息在表的schema-*文件中也能看到option信息println(option信息表)tEnv.executeSql(SELECT * FROM query_table$options).print()//consumer信息表在查询数据的sql语句中指定了consumer-id之后才能看到println(consumer信息表)tEnv.executeSql(SELECT * FROM query_table$consumers).print()//audit log信息表相当于是表的审核日志可以看到表中每条数据的rowkind也就是I\-U\U\-Dprintln(audit log信息表)tEnv.executeSql(SELECT * FROM query_table$audit_log).print()}
}运行代码。 注意在本地执行flink sql中的print会看到下面错误
java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down.at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)at org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1026)at org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:899)at org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:823)at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91)at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:210)at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118)at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:219)at org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:120)at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153)at tech.xuwei.paimon.query.FlinkPaimonSystemTable$.main(FlinkPaimonSystemTable.scala:35)at tech.xuwei.paimon.query.FlinkPaimonSystemTable.main(FlinkPaimonSystemTable.scala)这个异常不影响程序执行实际工作中我们不会写这种代码一般都是在sql中写insert into select语句了在这主要是为了方便测试忽略这个异常即可。
如果感觉看起来比较乱可以修改一下log4j.properties日志中的告警级别改为error级别即可。
log4j.rootLoggererror,stdoutlog4j.appender.stdout org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target System.out
log4j.appender.stdout.layoutorg.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n重新运行代码可以看到如下结果
snapshot信息表
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| op | snapshot_id | schema_id | commit_user | commit_identifier | commit_kind | commit_time | total_record_count | delta_record_count | changelog_record_count | watermark |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| I | 1 | 0 | 8f74d97b-bf6b-4ac7-bb47-3bb... | 9223372036854775807 | APPEND | 2023-07-28 17:35:22.859 | 1 | 1 | 0 | -9223372036854775808 |
| I | 2 | 0 | 49412497-1749-4566-8bf8-1c5... | 9223372036854775807 | APPEND | 2023-07-28 17:35:24.802 | 2 | 1 | 0 | -9223372036854775808 |
| I | 3 | 0 | e55e756d-e528-4b7c-97f0-a01... | 9223372036854775807 | APPEND | 2023-07-28 17:35:26.409 | 3 | 1 | 0 | -9223372036854775808 |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
3 rows in set
schema信息表
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| op | schema_id | fields | partition_keys | primary_keys | options | comment |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| I | 0 | [{id:0,name:name,typ... | [] | [name] | {} | |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1 row in set
manifest信息表
----------------------------------------------------------------------------------------------------------------------------
| op | file_name | file_size | num_added_files | num_deleted_files | schema_id |
----------------------------------------------------------------------------------------------------------------------------
| I | manifest-800ac729-22d3-494b... | 1665 | 1 | 0 | 0 |
| I | manifest-61d14e4e-d2a0-42ac... | 1675 | 1 | 0 | 0 |
| I | manifest-fd8e45b0-d456-467a... | 1673 | 1 | 0 | 0 |
----------------------------------------------------------------------------------------------------------------------------
3 rows in set
file信息表
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| op | partition | bucket | file_path | file_format | schema_id | level | record_count | file_size_in_bytes | min_key | max_key | null_value_counts | min_value_stats | max_value_stats | creation_time |
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| I | [] | 0 | data-6b23bcaf-3dbe-46c0-a67... | orc | 0 | 0 | 1 | 566 | [jack] | [jack] | {age0, name0} | {age18, namejack} | {age18, namejack} | 2023-07-28 17:35:22.453 |
| I | [] | 0 | data-ce40f0df-aa2a-4682-8b6... | orc | 0 | 0 | 1 | 581 | [mick] | [mick] | {age0, name0} | {age20, namemick} | {age20, namemick} | 2023-07-28 17:35:26.257 |
| I | [] | 0 | data-ac9bd895-2b8e-4efe-969... | orc | 0 | 0 | 1 | 572 | [tom] | [tom] | {age0, name0} | {age19, nametom} | {age19, nametom} | 2023-07-28 17:35:24.603 |
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
3 rows in set
option信息表
Empty set
tag信息表
Empty set
consumer信息表
Empty set
audit log信息表
---------------------------------------------------------------------------------
| op | rowkind | name | age |
---------------------------------------------------------------------------------
| I | I | jack | 18 |
| I | I | mick | 20 |
| I | I | tom | 19 |
---------------------------------------------------------------------------------
3 rows in set2批量读取
下面演示一下如何在批量读取中使用时间旅行功能。
创建objecttech.xuwei.paimon.query.FlinkPaimonBatchQuery
代码如下
package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 批量读取* Created by xuwei*/
object FlinkPaimonBatchQuery {def main(args: Array[String]): Unit {//获取执行环境val env StreamExecutionEnvironment.getExecutionEnvironment//SET execution.runtime-mode batch;env.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式val tEnv StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql(|CREATE CATALOG paimon_catalog WITH (| typepaimon,| warehousehdfs://bigdata01:9000/paimon|)|.stripMargin)tEnv.executeSql(USE CATALOG paimon_catalog)//批量查询数据tEnv.executeSql(|SELECT * FROM query_table|-- /* OPTIONS(scan.modelatest-full) */ -- 默认策略可以省略不写只读取最新快照中的所有数据|-- /* OPTIONS(scan.modelatest) */ -- 在批处理模式下和latest-full的效果一致|-- /* OPTIONS(scan.modefrom-snapshot,scan.snapshot-id 2) */ -- 只读取指定id的快照中的所有数据|-- /* OPTIONS(scan.modefrom-snapshot-full,scan.snapshot-id 2) */ -- 在批处理模式下和from-snapshot的效果一致|-- /* OPTIONS(scan.modefrom-timestamp,scan.timestamp-millis 1690536924802) */ -- 只读取指定时间戳的快照中的所有数据|-- /* OPTIONS(scan.modeincremental,incremental-between 1,3) */ -- 指定两个快照id查询这两个快照之间的增量变化|-- /* OPTIONS(scan.modeincremental,incremental-between-timestamp 1690536922859,1690536926409) */ -- 指定两个时间戳查询这两个快照之间的增量变化|.stripMargin).print()}
}运行代码查看每一种策略的数据结果。
注意在演示compacted-full这种策略的时候需要给表开启full-compaction。
所以重新创建一个新的表。
创建objectFlinkSQLWriteToPaimonForCompact
代码如下
package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 通过FlinkSQL 向Paimon中模拟写入数据* Created by xuwei*/
object FlinkSQLWriteToPaimonForCompact {def main(args: Array[String]): Unit {//获取执行环境val env StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql(|CREATE CATALOG paimon_catalog WITH (| typepaimon,| warehousehdfs://bigdata01:9000/paimon|)|.stripMargin)tEnv.executeSql(USE CATALOG paimon_catalog)//创建Paimon类型的表tEnv.executeSql(|CREATE TABLE IF NOT EXISTS query_table_compact(| name STRING,| age INT,| PRIMARY KEY (name) NOT ENFORCED|)WITH(| changelog-producer full-compaction,| full-compaction.delta-commits 1|)|.stripMargin)//写入数据tEnv.executeSql(INSERT INTO query_table_compact(name,age) VALUES(jack,18))tEnv.executeSql(INSERT INTO query_table_compact(name,age) VALUES(tom,19))tEnv.executeSql(INSERT INTO query_table_compact(name,age) VALUES(mick,20))}}运行代码。
再创建一个新的读取数据的类
创建objectFlinkPaimonBatchQueryForCompact
代码如下
package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 批量读取* Created by xuwei*/
object FlinkPaimonBatchQueryForCompact {def main(args: Array[String]): Unit {//获取执行环境val env StreamExecutionEnvironment.getExecutionEnvironment//SET execution.runtime-mode batch;env.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式val tEnv StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql(|CREATE CATALOG paimon_catalog WITH (| typepaimon,| warehousehdfs://bigdata01:9000/paimon|)|.stripMargin)tEnv.executeSql(USE CATALOG paimon_catalog)//批量查询数据tEnv.executeSql(|SELECT * FROM query_table_compact|/* OPTIONS(scan.mode compacted-full) */ --表需要开启full-compaction设置changelog-producer和full-compaction.delta-commits|.stripMargin).print()}
}运行代码可以看到如下结果
-------------------------------------------------
| op | name | age |
-------------------------------------------------
| I | jack | 18 |
| I | mick | 20 |
| I | tom | 19 |
-------------------------------------------------由于目前每一次提交数据都会触发完全压缩所以我们查询最新的完全压缩快照中的数据是可以获取到所有数据的。
此时可以通过系统表查看一下这个表的snapshot信息 创建objectFlinkPaimonSystemTableForCompact
代码如下
package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 查询Paimon中的系统表* Created by xuwei*/
object FlinkPaimonSystemTableForCompact {def main(args: Array[String]): Unit {//获取执行环境val env StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式val tEnv StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql(|CREATE CATALOG paimon_catalog WITH (| typepaimon,| warehousehdfs://bigdata01:9000/paimon|)|.stripMargin)tEnv.executeSql(USE CATALOG paimon_catalog)//snapshot信息表对应的其实就是hdfs中表的snapshot目录下的snapshot-*文件信息println(snapshot信息表)tEnv.executeSql(SELECT * FROM query_table_compact$snapshots).print()}
}执行代码可以看到如下结果
snapshot信息表
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| op | snapshot_id | schema_id | commit_user | commit_identifier | commit_kind | commit_time | total_record_count | delta_record_count | changelog_record_count | watermark |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| I | 1 | 0 | 38d8f3a4-aeb3-4072-90cf-421... | 9223372036854775807 | APPEND | 2023-07-28 17:57:07.293 | 1 | 1 | 0 | -9223372036854775808 |
| I | 2 | 0 | 38d8f3a4-aeb3-4072-90cf-421... | 9223372036854775807 | COMPACT | 2023-07-28 17:57:08.211 | 3 | 2 | 1 | -9223372036854775808 |
| I | 3 | 0 | 84203720-0e42-40a6-8202-642... | 9223372036854775807 | APPEND | 2023-07-28 17:57:09.423 | 4 | 1 | 0 | -9223372036854775808 |
| I | 4 | 0 | 84203720-0e42-40a6-8202-642... | 9223372036854775807 | COMPACT | 2023-07-28 17:57:09.641 | 8 | 4 | 1 | -9223372036854775808 |
| I | 5 | 0 | 25d0f600-076a-407f-a07a-caf... | 9223372036854775807 | APPEND | 2023-07-28 17:57:11.500 | 9 | 1 | 0 | -9223372036854775808 |
| I | 6 | 0 | 25d0f600-076a-407f-a07a-caf... | 9223372036854775807 | COMPACT | 2023-07-28 17:57:12.130 | 15 | 6 | 1 | -9223372036854775808 |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------此时可以看到在commit_kind这一列中显示的有APPEND和COMPACT表示这个快照是追加产生的还是完全压缩产生的。
由于我们配置的每一次提交数据都会触发完全压缩所以对应的有3个完全压缩产生的快照。
为了便于验证我们可以把最新的那个完全压缩的快照删除掉再执行查询看看结果是什么样的
删除最新的完全压缩的快照
[rootbigdata04 ~]# hdfs dfs -rm -r /paimon/default.db/query_table_compact/snapshot/snapshot-6注意这个删除操作建议大家在命令行执行不要在web页面执行在web页面删除可能会直接把这个表的目录删除掉 然后再执行FlinkPaimonBatchQueryForCompact结果如下
-------------------------------------------------
| op | name | age |
-------------------------------------------------
| I | jack | 18 |
| I | tom | 19 |
-------------------------------------------------
2 rows in set注意此时最新的完全压缩的快照就是snapshot-4了这个快照中只有2条数据。
这就是批量读取中时间旅行参数的使用。
3流式读取
下面演示一下如何在流式读取中使用时间旅行功能。
创建objectFlinkPaimonStreamingQuery
代码如下
package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 流式读取* Created by xuwei*/
object FlinkPaimonStreamingQuery {def main(args: Array[String]): Unit {//获取执行环境val env StreamExecutionEnvironment.getExecutionEnvironment//SET execution.runtime-mode streaming;env.setRuntimeMode(RuntimeExecutionMode.STREAMING)//使用流处理模式val tEnv StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql(|CREATE CATALOG paimon_catalog WITH (| typepaimon,| warehousehdfs://bigdata01:9000/paimon|)|.stripMargin)tEnv.executeSql(USE CATALOG paimon_catalog)//流式查询数据tEnv.executeSql(|SELECT * FROM query_table|-- /* OPTIONS(scan.modelatest-full) */ -- 默认策略可以省略不写第一次启动时读取最新快照中的所有数据然后继续读取后续新增的变更数据|-- /* OPTIONS(scan.modelatest) */ -- 只读取最新的变更数据|-- /* OPTIONS(scan.modefrom-snapshot,scan.snapshot-id 2) */ -- 从指定id的快照开始读取变更数据(包含后续新增)|-- /* OPTIONS(scan.modefrom-snapshot-full,scan.snapshot-id 2) */ -- 第一次启动时读取指定id的快照中的所有数据然后继续读取后续新增的变更数据|-- /* OPTIONS(scan.modefrom-timestamp,scan.timestamp-millis 1690536924802) */ -- 从指定时间戳的快照开始读取变更数据(包含后续新增)|.stripMargin).print()}
}4Consumer ID
最后我们在流式读取这里扩展一个知识点Consumer ID这个功能是针对流式读取设计的。
相当于我们在kafka消费者中指定一个groupid这样可以通过groupid维护消费数据的偏移量信息便于任务停止以后重启的时候继续基于之前的进度进行查询。
在这里Consumer ID的主要作用是为了方便记录每次查询到的数据快照的位置他会把下一个还未读取的快照id记录到hdfs文件中。 当之前的任务停止以后新启动的任务可以基于之前任务记录的快照id继续查询数据不需要从状态中恢复位置信息。
这个特性目前属于实验特性还没有经过大量生产环境的验证大家可以先提前了解一下。
下面来结合一个案例演示一下 具体的思路是这样的
1首先使用Consumer ID查询一次query_table表中的数据。2然后停止之前的查询任务向query_table表中模拟产生1条数据。3重新启动第1步骤中的任务验证一下是否只读取到了新增的那1条数据
创建objectFlinkPaimonStreamingQueryForConsumerid
代码如下
package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 流式读取* Created by xuwei*/
object FlinkPaimonStreamingQueryForConsumerid {def main(args: Array[String]): Unit {//获取执行环境val env StreamExecutionEnvironment.getExecutionEnvironment//SET execution.runtime-mode streaming;env.setRuntimeMode(RuntimeExecutionMode.STREAMING)//使用流处理模式val tEnv StreamTableEnvironment.create(env)//注意在流处理模式中操作Paimon表时需要开启Checkpoint。env.enableCheckpointing(5000)//创建Paimon类型的CatalogtEnv.executeSql(|CREATE CATALOG paimon_catalog WITH (| typepaimon,| warehousehdfs://bigdata01:9000/paimon|)|.stripMargin)tEnv.executeSql(USE CATALOG paimon_catalog)//流式查询数据tEnv.executeSql(|SELECT * FROM query_table|/* OPTIONS(consumer-idcon-1) */ -- 指定消费者id|.stripMargin).print()}
}注意在这需要开启checkpoint否则Consumer ID的功能无法正常触发。
第一次执行此代码可以看到如下结果
-------------------------------------------------
| op | name | age |
-------------------------------------------------
| I | jack | 18 |
| I | mick | 20 |
| I | tom | 19 |停止此代码。
此时其实可以到hdfs中查看一下维护的Consumer ID信息
[rootbigdata04 ~]# hdfs dfs -cat /paimon/default.db/query_table/consumer/consumer-con-1
{nextSnapshot : 4
}这里面记录的是下一次需要读取的快照id数值为4此时最新的快照id是3因为快照id为3的快照已经读取过了下一个快照id就是4了。
其实直接查询consumer系统表也是可以看到这些信息的。
创建objectFlinkPaimonSystemTableForConsumerid
代码如下
package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 查询Paimon中的系统表* Created by xuwei*/
object FlinkPaimonSystemTableForConsumerid {def main(args: Array[String]): Unit {//获取执行环境val env StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式val tEnv StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql(|CREATE CATALOG paimon_catalog WITH (| typepaimon,| warehousehdfs://bigdata01:9000/paimon|)|.stripMargin)tEnv.executeSql(USE CATALOG paimon_catalog)//consumer信息表在查询数据的sql语句中指定了consumer-id之后才能看到println(consumer信息表)tEnv.executeSql(SELECT * FROM query_table$consumers).print()}
}执行代码可以看到如下结果
consumer信息表
----------------------------------------------------------
| op | consumer_id | next_snapshot_id |
----------------------------------------------------------
| I | con-1 | 4 |
----------------------------------------------------------
1 row in set从这可以看出来next_snapshot_id是4查出来的结果是一样的。
接下来我们向query_table中新增一条数据。
创建objectFlinkSQLWriteToPaimonForConsumerid
代码如下
package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 通过FlinkSQL 向Paimon中模拟写入数据* Created by xuwei*/
object FlinkSQLWriteToPaimonForConsumerid {def main(args: Array[String]): Unit {//获取执行环境val env StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql(|CREATE CATALOG paimon_catalog WITH (| typepaimon,| warehousehdfs://bigdata01:9000/paimon|)|.stripMargin)tEnv.executeSql(USE CATALOG paimon_catalog)//创建Paimon类型的表tEnv.executeSql(|CREATE TABLE IF NOT EXISTS query_table(| name STRING,| age INT,| PRIMARY KEY (name) NOT ENFORCED|)|.stripMargin)//写入数据tEnv.executeSql(INSERT INTO query_table(name,age) VALUES(jessic,30))}}执行代码。
最后我们再重新启动FlinkPaimonStreamingQueryForConsumerid可以看到如下结果
-------------------------------------------------
| op | name | age |
-------------------------------------------------
| I | jessic | 30 |能看到这个结果说明这个consumer id生效了当我们第二次使用相同的consumer id读取这个表的时候是可以基于之前的进度继续读取的。
停止此任务。
此时再执行FlinkPaimonSystemTableForConsumerid查看最新的next_snapshot_id
consumer信息表
----------------------------------------------------------
| op | consumer_id | next_snapshot_id |
----------------------------------------------------------
| I | con-1 | 5 |
----------------------------------------------------------
1 row in set此时next_snapshot_id变成了5这是正确的。
更多Paimon数据湖内容请关注https://edu.51cto.com/course/35051.html