网站开发用什么写,php mysql 企业网站源码,战略网页游戏开服表,中国建设造价协会网站文章目录 简介概述作用特性数据存储、计算引擎插件化实时流批一体数据表演化#xff08;Table Evolution#xff09;模式演化#xff08;Schema Evolution#xff09;分区演化#xff08;Partition Evolution#xff09;列顺序演化#xff08;Sort Order Evolution… 文章目录 简介概述作用特性数据存储、计算引擎插件化实时流批一体数据表演化Table Evolution模式演化Schema Evolution分区演化Partition Evolution列顺序演化Sort Order Evolution隐藏分区Hidden Partition镜像数据查询Time Travel支持事务ACID基于乐观锁的并发支持文件级数据剪裁 其他数据湖框架的对比 存储结构数据文件 data files表快照 Snapshot清单列表 Manifest list清单文件 Manifest file 与 Hive集成环境准备创建和管理 Catalog默认使用 HiveCatalog指定 Catalog 类型指定路径加载 基本操作创建表修改表插入表删除表 与 Spark SQL集成环境准备Spark 配置 CatalogHive CatalogHadoop Catalog SQL 操作创建表删除表修改表插入数据查询数据存储过程 DataFrame 操作环境准备读取表检查表写入表维护表 与 Flink SQL 集成环境准备创建和使用 Catalog语法说明Hive CatalogHadoop Catalog配置sql-client初始化文件 DDL 语句创建数据库创建表修改表删除表 插入语句INSERT INTOINSERT OVERWRITEUPSERT 查询语句Batch模式Streaming模式 与Flink集成的不足 与 Flink DataStream 集成环境准备读取数据常规Source写法FLIP-27 Source写法 写入数据合并小文件 简介
概述
为了解决数据存储和计算引擎之间的适配的问题Netflix开发了Iceberg2018年11月16日进入Apache孵化器2020 年5月19日从孵化器毕业成为Apache的顶级项目。
Iceberg是一个面向海量数据分析场景的开放表格式Table Format。表格式Table Format可以理解为元数据以及数据文件的一种组织方式处于计算框架FlinkSpark…之下数据文件之上。
作用
大数据领域发展至今已经经历了相当长时间的发展和探索虽然大数据技术的出现和迭代降低了用户处理海量数据的门槛但是有一个问题不能忽视数据格式对不同引擎适配的对接。
也就是说我们在使用不同的引擎进行计算时需要将数据根据引擎进行适配。这是相当棘手的问题。
为此出现了一种新的解决方案介于上层计算引擎和底层存储格式之间的一个中间层。这个中间层不是数据存储的方式只是定义了数据的元数据组织方式并且向引擎层面提供统一的类似传统数据库中表的语义。它的底层仍然是Parquet、ORC等存储格式。基于此Netflix开发了Iceberg目前已经是Apache的顶级项目。
特性
数据存储、计算引擎插件化
Iceberg提供一个开放通用的表格式Table Format实现方案不和特定的数据存储、计算引擎绑定。目前大数据领域的常见数据存储HDFS、S3…计算引擎Flink、Spark…都可以接入Iceberg。
在生产环境中可选择不同的组件搭使用。甚至可以不通过计算引擎直接读取存在文件系统上的数据。
实时流批一体
Iceberg上游组件将数据写入完成后下游组件及时可读可查询。可以满足实时场景。并且Iceberg同时提供了流/批读接口、流/批写接口。可以在同一个流程里, 同时处理流数据和批数据大大简化了ETL链路。
数据表演化Table Evolution
Iceberg可以通过SQL的方式进行表级别模式演进。进行这些操作的时候代价极低。 不存在读出数据重新写入或者迁移数据这种费时费力的操作。
比如在常用的Hive中如果我们需要把一个按天分区的表改成按小时分区。此时不能再原表之上直接修改只能新建一个按小时分区的表然后再把数据Insert到新的小时分区表。而且即使我们通过Rename的命令把新表的名字改为原表使用原表的上次层应用, 也可能由于分区字段修改导致需要修改 SQL这样花费的经历是非常繁琐的。
模式演化Schema Evolution
Iceberg支持下面几种模式演化 ADD向表或者嵌套结构增加新列 Drop从表中或者嵌套结构中移除一列 Rename重命名表中或者嵌套结构中的一列 Update将复杂结构(struct, mapkey, value, list)中的基本类型扩展类型长度, 比如tinyint修改成int. Reorder改变列或者嵌套结构中字段的排列顺序
Iceberg保证模式演化Schema Evolution是没有副作用的独立操作流程, 一个元数据操作, 不会涉及到重写数据文件的过程。具体的如下: 增加列时候不会从另外一个列中读取已存在的的数据 删除列或者嵌套结构中字段的时候不会改变任何其他列的值 更新列或者嵌套结构中字段的时候不会改变任何其他列的值 改变列列或者嵌套结构中字段顺序的时候不会改变相关联的值
在表中Iceberg 使用唯一ID来定位每一列的信息。新增一个列的时候,会新分配给它一个唯一ID, 并且绝对不会使用已经被使用的ID。
使用名称或者位置信息来定位列的, 都会存在一些问题, 比如使用名称的话,名称可能会重复, 使用位置的话, 不能修改顺序并且废弃的字段也不能删除。
分区演化Partition Evolution
Iceberg可以在一个已存在的表上直接修改因为Iceberg的查询流程并不和分区信息直接关联。
当我们改变一个表的分区策略时对应修改分区之前的数据不会改变, 依然会采用老的分区策略新的数据会采用新的分区策略也就是说同一个表会有两种分区策略旧数据采用旧分区策略新数据采用新新分区策略, 在元数据里两个分区策略相互独立不重合。
在查询数据的时候如果存在跨分区策略的情况则会解析成两个不同执行计划如Iceberg官网提供图所示 图中booking_table表2008年按月分区进入2009年后改为按天分区这种中分区策略共存于该表中。
借助Iceberg的隐藏分区Hidden Partition在写SQL 查询的时候不需要在SQL中特别指定分区过滤条件Iceberg会自动分区过滤掉不需要的数据。
Iceberg分区演化操作同样是一个元数据操作, 不会重写数据文件。
列顺序演化Sort Order Evolution
Iceberg可以在一个已经存在的表上修改排序策略。修改了排序策略之后, 旧数据依旧采用老排序策略不变。往Iceberg里写数据的计算引擎总是会选择最新的排序策略, 但是当排序的代价极其高昂的时候, 就不进行排序了。
隐藏分区Hidden Partition
Iceberg的分区信息并不需要人工维护, 它可以被隐藏起来. 不同其他类似Hive 的分区策略, Iceberg的分区字段/策略通过某一个字段计算出来可以不是表的字段和表数据存储目录也没有关系。在建表或者修改分区策略之后新的数据会自动计算所属于的分区。在查询的时候同样不用关系表的分区是什么字段/策略只需要关注业务逻辑Iceberg会自动过滤不需要的分区数据。
正是由于Iceberg的分区信息和表数据存储目录是独立的使得Iceberg的表分区可以被修改,而且不和涉及到数据迁移。
镜像数据查询Time Travel
Iceberg提供了查询表历史某一时间点数据镜像snapshot的能力。通过该特性可以将最新的SQL逻辑应用到历史数据上。
支持事务ACID
Iceberg通过提供事务ACID的机制使其具备了upsert的能力并且使得边写边读成为可能从而数据可以更快的被下游组件消费。通过事务保证了下游组件只能消费已commit的数据而不会读到部分甚至未提交的数据。
基于乐观锁的并发支持
Iceberg基于乐观锁提供了多个程序并发写入的能力并且保证数据线性一致。
文件级数据剪裁
Iceberg的元数据里面提供了每个数据文件的一些统计信息比如最大值最小值Count计数等等。因此查询SQL的过滤条件除了常规的分区列过滤甚至可以下推到文件级别大大加快了查询效率。
其他数据湖框架的对比 存储结构 数据文件 data files
数据文件是Apache Iceberg表真实存储数据的文件一般是在表的数据存储目录的data目录下如果我们的文件格式选择的是parquet,那么文件是以“.parquet”结尾。
例如00000-0-atguigu_20230203160458_22ee74c9-643f-4b27-8fc1-9cbd5f64dad4-job_1675409881387_0007-00001.parquet 就是一个数据文件。
Iceberg每次更新会产生多个数据文件data files。
表快照 Snapshot
快照代表一张表在某个时刻的状态。每个快照里面会列出表在某个时刻的所有 data files 列表。data files是存储在不同的manifest files里面manifest files是存储在一个Manifest list文件里面而一个Manifest list文件代表一个快照。
清单列表 Manifest list
manifest list是一个元数据文件它列出构建表快照Snapshot的清单Manifest file。这个元数据文件中存储的是Manifest file列表每个Manifest file占据一行。每行中存储了Manifest file的路径、其存储的数据文件data files的分区范围增加了几个数文件、删除了几个数据文件等信息这些信息可以用来在查询时提供过滤加快速度。
例如snap-6746266566064388720-1-52f2f477-2585-4e69-be42-bbad9a46ed17.avro就是一个Manifest List文件。
清单文件 Manifest file
Manifest file也是一个元数据文件它列出组成快照snapshot的数据文件data files的列表信息。每行都是每个数据文件的详细描述包括数据文件的状态、文件路径、分区信息、列级别的统计信息比如每列的最大最小值、空值数等、文件的大小以及文件里面数据行数等信息。其中列级别的统计信息可以在扫描表数据时过滤掉不必要的文件。
Manifest file是以avro格式进行存储的以“.avro”后缀结尾例如52f2f477-2585-4e69-be42-bbad9a46ed17-m0.avro。
与 Hive集成
环境准备
1Hive与Iceberg的版本对应关系如下
Hive 版本官方推荐Hive版本Iceberg 版本2.x2.3.80.8.0-incubating – 1.1.03.x3.1.20.10.0 – 1.1.0
Iceberg与Hive 2和Hive 3.1.2/3的集成支持以下特性 创建表 删除表 读取表 插入表INSERT into 更多功能需要Hive 4.x目前alpha版本才能支持。 2上传jar包拷贝到Hive的auxlib目录中
mkdir auxlib
cp iceberg-hive-runtime-1.1.0.jar /opt/module/hive/auxlib
cp libfb303-0.9.3.jar /opt/module/hive/auxlibcp iceberg-hive-runtime-1.1.0.jar /opt/module/hive/auxlibcp libfb303-0.9.3.jar /opt/module/hive/auxlib3修改hive-site.xml添加配置项
propertynameiceberg.engine.hive.enabled/namevaluetrue/value
/propertypropertynamehive.aux.jars.path/namevalue/opt/module/hive/auxlib/value
/property使用TEZ引擎注意事项 使用Hive版本3.1.2需要TEZ版本0.10.1 指定tez更新配置 propertynametez.mrreader.config.update.properties/namevaluehive.io.file.readcolumn.names,hive.io.file.readcolumn.ids/value
/property从Iceberg 0.11.0开始如果Hive使用Tez引擎需要关闭向量化执行 propertynamehive.vectorized.execution.enabled/namevaluefalse/value
/property4启动HMS服务
5启动 Hadoop
创建和管理 Catalog
Iceberg支持多种不同的Catalog类型例如:Hive、Hadoop、亚马逊的AWS Glue和自定义Catalog。
根据不同配置分为三种情况
没有设置iceberg.catalog默认使用HiveCatalog
配置项说明iceberg.catalog.catalog_name.typeCatalog的类型: hive, hadoop, 如果使用自定义Catalog则不设置iceberg.catalog.catalog_name.catalog-implCatalog的实现类, 如果上面的type没有设置则此参数必须设置iceberg.catalog.catalog_name.keyCatalog的其他配置项 设置了 iceberg.catalog的类型使用指定的Catalog类型如下表格 设置 iceberg.cataloglocation_based_table直接通过指定的根路径来加载Iceberg表
默认使用 HiveCatalog
CREATE TABLE iceberg_test1 (i int) STORED BY org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;INSERT INTO iceberg_test1 values(1);查看HDFS可以发现表目录在默认的hive仓库路径下。
指定 Catalog 类型
1使用 HiveCatalog
set iceberg.catalog.iceberg_hive.typehive;
set iceberg.catalog.iceberg_hive.urithrift://hadoop1:9083;
set iceberg.catalog.iceberg_hive.clients10;
set iceberg.catalog.iceberg_hive.warehousehdfs://hadoop1:8020/warehouse/iceberg-hive;CREATE TABLE iceberg_test2 (i int)
STORED BY org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
TBLPROPERTIES(iceberg.catalogiceberg_hive);INSERT INTO iceberg_test2 values(1);2使用 HadoopCatalog
set iceberg.catalog.iceberg_hadoop.typehadoop;
set iceberg.catalog.iceberg_hadoop.warehousehdfs://hadoop1:8020/warehouse/iceberg-hadoop;CREATE TABLE iceberg_test3 (i int)
STORED BY org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
LOCATION hdfs://hadoop1:8020/warehouse/iceberg-hadoop/default/iceberg_test3
TBLPROPERTIES(iceberg.catalogiceberg_hadoop);INSERT INTO iceberg_test3 values(1);指定路径加载
如果HDFS中已经存在iceberg格式表我们可以通过在Hive中创建Icerberg格式表指定对应的location路径映射数据。
CREATE EXTERNAL TABLE iceberg_test4 (i int)
STORED BY org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
LOCATION hdfs://hadoop1:8020/warehouse/iceberg-hadoop/default/iceberg_test3
TBLPROPERTIES (iceberg.cataloglocation_based_table);基本操作
创建表
1创建外部表
CREATE EXTERNAL TABLE iceberg_create1 (i int)
STORED BY org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;describe formatted iceberg_create1;2创建内部表
CREATE TABLE iceberg_create2 (i int)
STORED BY org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;describe formatted iceberg_create2;3创建分区表
CREATE EXTERNAL TABLE iceberg_create3 (id int,name string)
PARTITIONED BY (age int)
STORED BY org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;describe formatted iceberg_create3;Hive语法创建分区表不会在HMS中创建分区而是将分区数据转换为Iceberg标识分区。这种情况下不能使用Iceberg的分区转换例如days(timestamp)如果想要使用Iceberg格式表的分区转换标识分区需要使用Spark或者Flink引擎创建表。
修改表
只支持HiveCatalog表修改表属性Iceberg表属性和Hive表属性存储在HMS中是同步的。
ALTER TABLE iceberg_create1 SET TBLPROPERTIES(external.table.purgeFALSE);插入表
支持标准单表INSERT INTO操作
INSERT INTO iceberg_create2 VALUES (1);
INSERT INTO iceberg_create1 select * from iceberg_create2;在HIVE 3.x中INSERT OVERWRITE虽然能执行但其实是追加。
删除表
DROP TABLE iceberg_create1;与 Spark SQL集成
环境准备
1安装 Spark
1Spark与Iceberg的版本对应关系如下
Spark 版本Iceberg 版本2.40.7.0-incubating – 1.1.03.00.9.0 – 1.0.03.10.12.0 – 1.1.03.20.13.0 – 1.1.03.30.14.0 – 1.1.0
2上传并解压Spark安装包
tar -zxvf spark-3.3.1-bin-hadoop3.tgz -C /opt/module/
mv /opt/module/spark-3.3.1-bin-hadoop3 /opt/module/spark-3.3.13配置环境变量
sudo vim /etc/profile.d/my_env.shexport SPARK_HOME/opt/module/spark-3.3.1
export PATH$PATH:$SPARK_HOME/binsource /etc/profile.d/my_env.sh4拷贝iceberg的jar包到Spark的jars目录
cp /opt/software/iceberg/iceberg-spark-runtime-3.3_2.12-1.1.0.jar /opt/module/spark-3.3.1/jars2启动 Hadoop
Spark 配置 Catalog
Spark中支持两种Catalog的设置hive和hadoopHive Catalog就是Iceberg表存储使用Hive默认的数据路径Hadoop Catalog需要指定Iceberg格式表存储路径。
vim spark-defaults.confHive Catalog
spark.sql.catalog.hive_prod org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type hive
spark.sql.catalog.hive_prod.uri thrift://hadoop1:9083use hive_prod.db;Hadoop Catalog
spark.sql.catalog.hadoop_prod org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type hadoop
spark.sql.catalog.hadoop_prod.warehouse hdfs://hadoop1:8020/warehouse/spark-iceberguse hadoop_prod.db;SQL 操作
创建表
use hadoop_prod;
create database default;
use default;CREATE TABLE hadoop_prod.default.sample1 (id bigint COMMENT unique id,data string)
USING icebergPARTITIONED BY (partition-expressions) 配置分区 LOCATION ‘(fully-qualified-uri)’ 指定表路径 COMMENT ‘table documentation’ 配置表备注 TBLPROPERTIES (‘key’‘value’, …) 配置表属性
表属性https://iceberg.apache.org/docs/latest/configuration/
对Iceberg表的每次更改都会生成一个新的元数据文件json文件以提供原子性。默认情况下旧元数据文件作为历史文件保存不会删除。
如果要自动清除元数据文件在表属性中设置write.metadata.delete-after-commit.enabledtrue。这将保留一些元数据文件直到write.metadata.previous-versions-max并在每个新创建的元数据文件之后删除旧的元数据文件。
1创建分区表
1分区表
CREATE TABLE hadoop_prod.default.sample2 (id bigint,data string,category string)
USING iceberg
PARTITIONED BY (category)2创建隐藏分区表
CREATE TABLE hadoop_prod.default.sample3 (id bigint,data string,category string,ts timestamp)
USING iceberg
PARTITIONED BY (bucket(16, id), days(ts), category)支持的转换有: years(ts):按年划分 months(ts):按月划分 days(ts)或date(ts):等效于dateint分区 hours(ts)或date_hour(ts):等效于dateint和hour分区 bucket(N, col):按哈希值划分mod N个桶 truncate(L, col):按截断为L的值划分
字符串被截断为给定的长度
整型和长型截断为bin: truncate(10, i)生成分区0,10,20,30…
2使用 CTAS 语法建表
CREATE TABLE hadoop_prod.default.sample4
USING iceberg
AS SELECT * from hadoop_prod.default.sample3不指定分区就是无分区需要重新指定分区、表属性
CREATE TABLE hadoop_prod.default.sample5
USING iceberg
PARTITIONED BY (bucket(8, id), hours(ts), category)
TBLPROPERTIES (keyvalue)
AS SELECT * from hadoop_prod.default.sample33使用 Replace table 建表
REPLACE TABLE hadoop_prod.default.sample5
USING iceberg
AS SELECT * from hadoop_prod.default.sample3REPLACE TABLE hadoop_prod.default.sample5
USING iceberg
PARTITIONED BY (part)
TBLPROPERTIES (keyvalue)
AS SELECT * from hadoop_prod.default.sample3CREATE OR REPLACE TABLE hadoop_prod.default.sample6
USING iceberg
AS SELECT * from hadoop_prod.default.sample3删除表
对于HadoopCatalog而言运行DROP TABLE将从catalog中删除表并删除表内容。
CREATE EXTERNAL TABLE hadoop_prod.default.sample7 (id bigint COMMENT unique id,data string)
USING icebergINSERT INTO hadoop_prod.default.sample7 values(1,a)
DROP TABLE hadoop_prod.default.sample7对于HiveCatalog而言 在0.14之前运行DROP TABLE将从catalog中删除表并删除表内容。 从0.14开始DROP TABLE只会从catalog中删除表不会删除数据。为了删除表内容应该使用DROP table PURGE。
CREATE TABLE hive_prod.default.sample7 (id bigint COMMENT unique id,data string)
USING icebergINSERT INTO hive_prod.default.sample7 values(1,a)1删除表
DROP TABLE hive_prod.default.sample72删除表和数据
DROP TABLE hive_prod.default.sample7 PURGE修改表
Iceberg在Spark 3中完全支持ALTER TABLE包括: 重命名表 设置或删除表属性 添加、删除和重命名列 添加、删除和重命名嵌套字段 重新排序顶级列和嵌套结构字段 扩大int、float和decimal字段的类型 将必选列变为可选列
此外还可以使用SQL扩展来添加对分区演变的支持和设置表的写顺序。
CREATE TABLE hive_prod.default.sample1 (id bigint COMMENT unique id,data string)
USING iceberg1修改表名不支持修改HadoopCatalog的表名
ALTER TABLE hive_prod.default.sample1 RENAME TO hive_prod.default.sample22修改表属性 修改表属性 ALTER TABLE hive_prod.default.sample1 SET TBLPROPERTIES (read.split.target-size268435456
)ALTER TABLE hive_prod.default.sample1 SET TBLPROPERTIES (comment A table comment.
)删除表属性 ALTER TABLE hive_prod.default.sample1 UNSET TBLPROPERTIES (read.split.target-size)3添加列
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMNS (category string comment new_column
)-- 添加struct类型的列
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN point structx: double, y: double;-- 往struct类型的列中添加字段
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN point.z double-- 创建struct的嵌套数组列
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN points arraystructx: double, y: double;-- 在数组中的结构中添加一个字段。使用关键字element访问数组的元素列。
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN points.element.z double-- 创建一个包含Map类型的列key和value都为struct类型
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN pointsm mapstructx: int, structa: int;-- 在Map类型的value的struct中添加一个字段。
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN pointsm.value.b int在Spark 2.4.4及以后版本中可以通过添加FIRST或AFTER子句在任何位置添加列:
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN new_column1 bigint AFTER idALTER TABLE hadoop_prod.default.sample1
ADD COLUMN new_column2 bigint FIRST4修改列 修改列名 ALTER TABLE hadoop_prod.default.sample1 RENAME COLUMN data TO data1Alter Column修改类型只允许安全的转换 ALTER TABLE hadoop_prod.default.sample1
ADD COLUMNS (idd int)
ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN idd TYPE bigintAlter Column 修改列的注释 ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN id TYPE double COMMENT a
ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN id COMMENT bAlter Column修改列的顺序 ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN id FIRST
ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN new_column2 AFTER new_column1Alter Column修改列是否允许为null ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN id DROP NOT NULLALTER COLUMN不用于更新struct类型。使用ADD COLUMN和DROP COLUMN添加或删除struct类型的字段。
5删除列
ALTER TABLE hadoop_prod.default.sample1 DROP COLUMN idd
ALTER TABLE hadoop_prod.default.sample1 DROP COLUMN point.z6添加分区Spark3需要配置扩展
vim spark-default.conf
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions重新进入spark-sql shell
ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD category ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD bucket(16, id)
ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD truncate(data, 4)
ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD years(ts)ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD bucket(16, id) AS shard7删除分区Spark3需要配置扩展
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD category
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD bucket(16, id)
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD truncate(data, 4)
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD years(ts)
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD shard注意尽管删除了分区但列仍然存在于表结构中。
删除分区字段是元数据操作不会改变任何现有的表数据。新数据将被写入新的分区但现有数据将保留在旧的分区布局中。
当分区发生变化时动态分区覆盖行为也会发生变化。例如如果按天划分分区而改为按小时划分分区那么覆盖将覆盖每小时划分的分区而不再覆盖按天划分的分区。
删除分区字段时要小心可能导致元数据查询失败或产生不同的结果。
8修改分区Spark3需要配置扩展
ALTER TABLE hadoop_prod.default.sample1 REPLACE PARTITION FIELD bucket(16, id) WITH bucket(8, id)9修改表的写入顺序
ALTER TABLE hadoop_prod.default.sample1 WRITE ORDERED BY category, id
ALTER TABLE hadoop_prod.default.sample1 WRITE ORDERED BY category ASC, id DESC
ALTER TABLE hadoop_prod.default.sample1 WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST表写顺序不能保证查询的数据顺序。它只影响数据写入表的方式。
WRITE ORDERED BY设置了一个全局排序即跨任务的行排序就像在INSERT命令中使用ORDER BY一样:
INSERT INTO hadoop_prod.default.sample1
SELECT id, data, category, ts FROM another_table
ORDER BY ts, category要在每个任务内排序而不是跨任务排序使用local ORDERED BY:
ALTER TABLE hadoop_prod.default.sample1 WRITE LOCALLY ORDERED BY category, id10按分区并行写入
ALTER TABLE hadoop_prod.default.sample1 WRITE DISTRIBUTED BY PARTITION
ALTER TABLE hadoop_prod.default.sample1 WRITE DISTRIBUTED BY PARTITION LOCALLY ORDERED BY category, id插入数据
CREATE TABLE hadoop_prod.default.a (id bigint,count bigint)
USING icebergCREATE TABLE hadoop_prod.default.b (id bigint,count bigint,flag string)
USING iceberg1Insert Into
INSERT INTO hadoop_prod.default.a VALUES (1, 1), (2, 2), (3, 3);
INSERT INTO hadoop_prod.default.b VALUES (1, 1, a), (2, 2, b), (4, 4, d);2MERGE INTO行级更新
MERGE INTO hadoop_prod.default.a t
USING (SELECT * FROM hadoop_prod.default.b) u ON t.id u.id
WHEN MATCHED AND u.flagb THEN UPDATE SET t.count t.count u.count
WHEN MATCHED AND u.flaga THEN DELETE
WHEN NOT MATCHED THEN INSERT (id,count) values (u.id,u.count)查询数据
1普通查询
SELECT count(1) as count, data
FROM local.db.table
GROUP BY data2查询元数据
// 查询表快照
SELECT * FROM hadoop_prod.default.a.snapshots// 查询数据文件信息
SELECT * FROM hadoop_prod.default.a.files// 查询表历史
SELECT * FROM hadoop_prod.default.a.history// 查询 manifest
ELECT * FROM hadoop_prod.default.a.manifests存储过程
Procedures可以通过CALL从任何已配置的Iceberg Catalog中使用。所有Procedures都在namespace中。
1语法
按照参数名传参
CALL catalog_name.system.procedure_name(arg_name_2 arg_2, arg_name_1 arg_1)当按位置传递参数时如果结束参数是可选的则只有结束参数可以省略。
CALL catalog_name.system.procedure_name(arg_1, arg_2, ... arg_n)2快照管理 回滚到指定的快照id CALL hadoop_prod.system.rollback_to_snapshot(default.a, 7601163594701794741)回滚到指定时间的快照 CALL hadoop_prod.system.rollback_to_timestamp(db.sample, TIMESTAMP 2021-06-30 00:00:00.000)设置表的当前快照ID CALL hadoop_prod.system.set_current_snapshot(db.sample, 1)从快照变为当前表状态 CALL hadoop_prod.system.cherrypick_snapshot(default.a, 7629160535368763452)
CALL hadoop_prod.system.cherrypick_snapshot(snapshot_id 7629160535368763452, table default.a )3元数据管理 删除早于指定日期和时间的快照但保留最近100个快照 CALL hive_prod.system.expire_snapshots(db.sample, TIMESTAMP 2021-06-30 00:00:00.000, 100)删除Iceberg表中任何元数据文件中没有引用的文件 #列出所有需要删除的候选文件
CALL catalog_name.system.remove_orphan_files(table db.sample, dry_run true)
#删除指定目录中db.sample表不知道的任何文件
CALL catalog_name.system.remove_orphan_files(table db.sample, location tablelocation/data)合并数据文件合并小文件 CALL catalog_name.system.rewrite_data_files(db.sample)
CALL catalog_name.system.rewrite_data_files(table db.sample, strategy sort, sort_order id DESC NULLS LAST,name ASC NULLS FIRST)
CALL catalog_name.system.rewrite_data_files(table db.sample, strategy sort, sort_order zorder(c1,c2))
CALL catalog_name.system.rewrite_data_files(table db.sample, options map(min-input-files,2))
CALL catalog_name.system.rewrite_data_files(table db.sample, where id 3 and name foo)重写表清单来优化执行计划 CALL catalog_name.system.rewrite_manifests(db.sample)#重写表db中的清单。并禁用Spark缓存的使用。这样做可以避免执行程序上的内存问题。
CALL catalog_name.system.rewrite_manifests(db.sample, false)4迁移表 快照 CALL catalog_name.system.snapshot(db.sample, db.snap)
CALL catalog_name.system.snapshot(db.sample, db.snap, /tmp/temptable/)迁移 CALL catalog_name.system.migrate(spark_catalog.db.sample, map(foo, bar))
CALL catalog_name.system.migrate(db.sample)添加数据文件 CALL spark_catalog.system.add_files(table db.tbl,source_table db.src_tbl,partition_filter map(part_col_1, A)
)CALL spark_catalog.system.add_files(table db.tbl,source_table parquet.path/to/table
)5元数据信息 获取指定快照的父快照id CALL spark_catalog.system.ancestors_of(db.tbl)获取指定快照的所有祖先快照 CALL spark_catalog.system.ancestors_of(db.tbl, 1)
CALL spark_catalog.system.ancestors_of(snapshot_id 1, table db.tbl)DataFrame 操作
环境准备
1创建maven工程配置pom文件
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.atguigu.iceberg/groupIdartifactIdspark-iceberg-demo/artifactIdversion1.0-SNAPSHOT/versionpropertiesscala.binary.version2.12/scala.binary.versionspark.version3.3.1/spark.versionmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.target/propertiesdependencies!-- Spark的依赖引入 --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_${scala.binary.version}/artifactIdscopeprovided/scopeversion${spark.version}/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql_${scala.binary.version}/artifactIdscopeprovided/scopeversion${spark.version}/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-hive_${scala.binary.version}/artifactIdscopeprovided/scopeversion${spark.version}/version/dependency!--fastjson 1.2.80 存在安全漏洞,--dependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.83/version/dependency!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark-runtime-3.3 --dependencygroupIdorg.apache.iceberg/groupIdartifactIdiceberg-spark-runtime-3.3_2.12/artifactIdversion1.1.0/version/dependency/dependenciesbuildplugins!-- assembly打包插件 --plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-assembly-plugin/artifactIdversion3.0.0/versionexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executionsconfigurationarchivemanifest/manifest/archivedescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configuration/plugin!--Maven编译scala所需依赖--plugingroupIdnet.alchim31.maven/groupIdartifactIdscala-maven-plugin/artifactIdversion3.2.2/versionexecutionsexecutiongoalsgoalcompile/goalgoaltestCompile/goal/goals/execution/executions/plugin/plugins/build
/project2配置Catalog
val spark: SparkSession SparkSession.builder().master(local).appName(this.getClass.getSimpleName)//指定hive catalog, catalog名称为iceberg_hive.config(spark.sql.catalog.iceberg_hive, org.apache.iceberg.spark.SparkCatalog).config(spark.sql.catalog.iceberg_hive.type, hive).config(spark.sql.catalog.iceberg_hive.uri, thrift://hadoop1:9083)// .config(iceberg.engine.hive.enabled, true)//指定hadoop catalogcatalog名称为iceberg_hadoop .config(spark.sql.catalog.iceberg_hadoop, org.apache.iceberg.spark.SparkCatalog).config(spark.sql.catalog.iceberg_hadoop.type, hadoop).config(spark.sql.catalog.iceberg_hadoop.warehouse, hdfs://hadoop1:8020/warehouse/spark-iceberg).getOrCreate()读取表
1加载表
spark.read
.format(iceberg)
.load(hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a)
.show()或
// 仅支持Spark3.0以上
spark.table(iceberg_hadoop.default.a)
.show()2时间旅行指定时间查询
spark.read.option(as-of-timestamp, 499162860000).format(iceberg)
.load(hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a)
.show()3时间旅行指定快照id查询
spark.read.option(snapshot-id, 7601163594701794741L).format(iceberg)
.load(hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a)
.show()4增量查询
spark.read
.format(iceberg)
.option(start-snapshot-id, 10963874102873)
.option(end-snapshot-id, 63874143573109)
.load(hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a)
.show()查询的表只能是append的方式写数据不支持replace, overwrite, delete操作。
检查表
1查询元数据
spark.read.format(iceberg).load(iceberg_hadoop.default.a.files)
spark.read.format(iceberg).load(hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a#files)2元数据表时间旅行查询
spark.read
.format(iceberg)
.option(snapshot-id, 7601163594701794741L)
.load(iceberg_hadoop.default.a.files)写入表
1创建样例类准备DF
case class Sample(id:Int,data:String,category:String)val df: DataFrame spark.createDataFrame(Seq(Sample(1,A, a), Sample(2,B, b), Sample(3,C, c)))2插入数据并建表
df.writeTo(iceberg_hadoop.default.table1).create()import spark.implicits._
df.writeTo(iceberg_hadoop.default.table1).tableProperty(write.format.default, orc).partitionedBy($category).createOrReplace()3append追加
df.writeTo(iceberg_hadoop.default.table1).append()4动态分区覆盖
df.writeTo(iceberg_hadoop.default.table1).overwritePartitions()5静态分区覆盖
import spark.implicits._
df.writeTo(iceberg_hadoop.default.table1).overwrite($category c)6插入分区表且分区内排序
df.sortWithinPartitions(category).writeTo(iceberg_hadoop.default.table1).append()维护表
1获取Table对象
1HadoopCatalog
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;val conf new Configuration()
val catalog new HadoopCatalog(conf,hdfs://hadoop1:8020/warehouse/spark-iceberg)
val table: Table catalog.loadTable(TableIdentifier.of(db,table1))2HiveCatalog
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;val catalog new HiveCatalog()
catalog.setConf(spark.sparkContext.hadoopConfiguration)val properties new util.HashMap[String,String]()
properties.put(warehouse, hdfs://hadoop1:8020/warehouse/spark-iceberg)
properties.put(uri, thrift://hadoop1:9083)catalog.initialize(hive, properties)
val table: Table catalog.loadTable(TableIdentifier.of(db, table1))2快照过期清理
每次写入Iceberg表都会创建一个表的新快照或版本。快照可以用于时间旅行查询或者可以将表回滚到任何有效的快照。建议设置快照过期时间过期的旧快照将从元数据中删除不再可用于时间旅行查询。
// 1天过期时间
val tsToExpire: Long System.currentTimeMillis() - (1000 * 60 * 60 * 24)table.expireSnapshots().expireOlderThan(tsToExpire).commit()或使用SparkActions来设置过期
//SparkActions可以并行运行大型表的表过期设置
SparkActions.get().expireSnapshots(table).expireOlderThan(tsToExpire).execute()3删除无效文件
在Spark和其他分布式处理引擎中任务或作业失败可能会留下未被表元数据引用的文件在某些情况下正常的快照过期可能无法确定不再需要并删除该文件。
SparkActions.get().deleteOrphanFiles(table).execute()4合并小文件
数据文件过多会导致更多的元数据存储在清单文件中而较小的数据文件会导致不必要的元数据量和更低效率的文件打开成本。
SparkActions.get().rewriteDataFiles(table).filter(Expressions.equal(category, a)).option(target-file-size-bytes, 1024L.toString) //1KB.execute()与 Flink SQL 集成
Apache Iceberg同时支持Apache Flink的DataStream API和Table API。
环境准备
1安装 Flink
1Flink与Iceberg的版本对应关系如下
Flink 版本Iceberg 版本1.110.9.0 – 0.12.11.120.12.0 – 0.13.11.130.13.0 – 1.0.01.140.13.0 – 1.1.01.150.14.0 – 1.1.01.161.1.0 – 1.1.0
2上传并解压Flink安装包
tar -zxvf flink-1.16.0-bin-scala_2.12.tgz -C /opt/module/3配置环境变量
sudo vim /etc/profile.d/my_env.sh
export HADOOP_CLASSPATHhadoop classpath
source /etc/profile.d/my_env.sh4拷贝iceberg的jar包到Flink的lib目录
cp /opt/software/iceberg/iceberg-flink-runtime-1.16-1.1.0.jar /opt/module/flink-1.16.0/lib2启动 Hadoop
3启动 sql-client
1修改flink-conf.yaml配置
vim /opt/module/flink-1.16.0/conf/flink-conf.yamlclassloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 4state.backend: rocksdb
execution.checkpointing.interval: 30000
state.checkpoints.dir: hdfs://hadoop1:8020/ckps
state.backend.incremental: true2local模式
1修改workers
vim /opt/module/flink-1.16.0/conf/workers
#表示会在本地启动3个TaskManager的 local集群
localhost
localhost
localhost2启动Flink
/opt/module/flink-1.16.0/bin/start-cluster.sh查看webuihttp://hadoop1:8081
3启动Flink的sql-client
/opt/module/flink-1.16.0/bin/sql-client.sh embedded创建和使用 Catalog
语法说明
CREATE CATALOG catalog_name WITH (typeiceberg,config_keyconfig_value
); type: 必须是iceberg。必须 catalog-type: 内置了hive和hadoop两种catalog也可以使用catalog-impl来自定义catalog。可选 catalog-impl: 自定义catalog实现的全限定类名。如果未设置catalog-type则必须设置。可选 property-version: 描述属性版本的版本号。此属性可用于向后兼容以防属性格式更改。当前属性版本为1。可选 cache-enabled: 是否启用目录缓存默认值为true。可选 cache.expiration-interval-ms: 本地缓存catalog条目的时间(以毫秒为单位)负值如-1表示没有时间限制不允许设为0。默认值为-1。可选
Hive Catalog
1上传hive connector到flink的lib中
cp flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar /opt/module/flink-1.16.0/lib/2启动hive metastore服务
hive --service metastore3创建hive catalog
重启flink集群重新进入sql-client
CREATE CATALOG hive_catalog WITH (typeiceberg,catalog-typehive,urithrift://hadoop1:9083,clients5,property-version1,warehousehdfs://hadoop1:8020/warehouse/iceberg-hive
);use catalog hive_catalog;use catalog hive_catalog; uri: Hive metastore的thrift uri。(必选) clients:Hive metastore客户端池大小默认为2。(可选) warehouse: 数仓目录。 hive-conf-dir:包含hive-site.xml配置文件的目录路径hive-site.xml中hive.metastore.warehouse.dir 的值会被warehouse覆盖。 hadoop-conf-dir:包含core-site.xml和hdfs-site.xml配置文件的目录路径。
Hadoop Catalog
Iceberg还支持HDFS中基于目录的catalog可以使用’catalog-type’hadoop’配置。
CREATE CATALOG hadoop_catalog WITH (typeiceberg,catalog-typehadoop,warehousehdfs://hadoop1:8020/warehouse/iceberg-hadoop,property-version1
);use catalog hadoop_catalog;warehouse:存放元数据文件和数据文件的HDFS目录。必需
配置sql-client初始化文件
vim /opt/module/flink-1.16.0/conf/sql-client-init.sqlCREATE CATALOG hive_catalog WITH (typeiceberg,catalog-typehive,urithrift://hadoop1:9083,warehousehdfs://hadoop1:8020/warehouse/iceberg-hive
);USE CATALOG hive_catalog;后续启动sql-client时加上 -i sql文件路径 即可完成catalog的初始化。
/opt/module/flink-1.16.0/bin/sql-client.sh embedded -i conf/sql-client-init.sqlDDL 语句
创建数据库
CREATE DATABASE iceberg_db;
USE iceberg_db;创建表
CREATE TABLE hive_catalog.default.sample (id BIGINT COMMENT unique id,data STRING
);建表命令现在支持最常用的flink建表语法包括: PARTITION BY (column1, column2, …)配置分区apache flink还不支持隐藏分区。 COMMENT ‘table document’指定表的备注 WITH (‘key’‘value’, …)设置表属性 目前不支持计算列、watermark支持主键。 1创建分区表
CREATE TABLE hive_catalog.default.sample (id BIGINT COMMENT unique id,data STRING
) PARTITIONED BY (data);Apache Iceberg支持隐藏分区但Apache flink不支持在列上通过函数进行分区现在无法在flink DDL中支持隐藏分区。
2使用LIKE语法建表
LIKE语法用于创建一个与另一个表具有相同schema、分区和属性的表。
CREATE TABLE hive_catalog.default.sample (id BIGINT COMMENT unique id,data STRING
);CREATE TABLE hive_catalog.default.sample_like LIKE hive_catalog.default.sample;修改表
1修改表属性
ALTER TABLE hive_catalog.default.sample SET (write.format.defaultavro);2修改表名
ALTER TABLE hive_catalog.default.sample RENAME TO hive_catalog.default.new_sample;删除表
DROP TABLE hive_catalog.default.sample;插入语句
INSERT INTO
INSERT INTO hive_catalog.default.sample VALUES (1, a);
INSERT INTO hive_catalog.default.sample SELECT id, data from sample2;INSERT OVERWRITE
仅支持Flink的Batch模式
SET execution.runtime-mode batch;
INSERT OVERWRITE sample VALUES (1, a);
INSERT OVERWRITE hive_catalog.default.sample PARTITION(dataa) SELECT 6;UPSERT
当将数据写入v2表格式时Iceberg支持基于主键的UPSERT。有两种方法可以启用upsert。
1建表时指定
CREATE TABLE hive_catalog.test1.sample5 (id INT UNIQUE COMMENT unique id,data STRING NOT NULL,PRIMARY KEY(id) NOT ENFORCED
) with (format-version2, write.upsert.enabledtrue
);2插入时指定
INSERT INTO tableName /* OPTIONS(upsert-enabledtrue) */
...插入的表format-version需要为2。
OVERWRITE和UPSERT不能同时设置。在UPSERT模式下如果对表进行分区则分区字段必须也是主键。
3读取Kafka流upsert插入到iceberg表中
create table default_catalog.default_database.kafka(id int,data string
) with (connector kafka,topic test111,properties.zookeeper.connect hadoop1:2181,properties.bootstrap.servers hadoop1:9092,format json,properties.group.idiceberg,scan.startup.modeearliest-offset
);INSERT INTO hive_catalog.test1.sample5 SELECT * FROM default_catalog.default_database.kafka;查询语句
Iceberg支持Flink的流式和批量读取。
Batch模式
SET execution.runtime-mode batch;
select * from sample;Streaming模式
SET execution.runtime-mode streaming;
SET table.dynamic-table-options.enabledtrue;
SET sql-client.execution.result-modetableau;1从当前快照读取所有记录然后从该快照读取增量数据
SELECT * FROM sample5 /* OPTIONS(streamingtrue, monitor-interval1s)*/ ;2读取指定快照id不包含后的增量数据
SELECT * FROM sample /* OPTIONS(streamingtrue, monitor-interval1s, start-snapshot-id3821550127947089987)*/ ;monitor-interval: 连续监控新提交数据文件的时间间隔默认为10s。 start-snapshot-id: 流作业开始的快照id。
**注意**如果是无界数据流式upsert进iceberg表读kafkaupsert进iceberg表那么再去流读iceberg表会存在读不出数据的问题。如果无界数据流式append进iceberg表读kafkaappend进iceberg表那么流读该iceberg表可以正常看到结果。
与Flink集成的不足
支持的特性Flink备注SQL create catalog√SQL create database√SQL create table√SQL create table like√SQL alter table√只支持修改表属性不支持更改列和分区SQL drop_table√SQL select√支持流式和批处理模式SQL insert into√支持流式和批处理模式SQL insert overwrite√DataStream read√DataStream append√DataStream overwrite√Metadata tables支持Java API不支持Flink SQLRewrite files action√ 不支持创建隐藏分区的Iceberg表。 不支持创建带有计算列的Iceberg表。 不支持创建带watermark的Iceberg表。 不支持添加列删除列重命名列更改列。 Iceberg目前不支持Flink SQL 查询表的元数据信息需要使用Java API 实现。
与 Flink DataStream 集成
环境准备
1配置pom文件
新建Maven工程pom文件配置如下
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.atguigu.iceberg/groupIdartifactIdflink-iceberg-demo/artifactIdversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetflink.version1.16.0/flink.versionjava.version1.8/java.versionscala.binary.version2.12/scala.binary.versionslf4j.version1.7.30/slf4j.version/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/versionscopeprovided/scope !--不会打包到依赖中只参与编译不参与运行 --/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner_${scala.binary.version}/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-files/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!--idea运行时也有webui--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-runtime-web/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion${slf4j.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion${slf4j.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-to-slf4j/artifactIdversion2.14.0/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-statebackend-rocksdb/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion3.1.3/versionscopeprovided/scope/dependency!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-flink-runtime-1.16 --dependencygroupIdorg.apache.iceberg/groupIdartifactIdiceberg-flink-runtime-1.16/artifactIdversion1.1.0/version/dependency/dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion3.2.4/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationartifactSetexcludesexcludecom.google.code.findbugs:jsr305/excludeexcludeorg.slf4j:*/excludeexcludelog4j:*/excludeexcludeorg.apache.hadoop:*/exclude/excludes/artifactSetfiltersfilter!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --artifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filterstransformers combine.childrenappendtransformerimplementationorg.apache.maven.plugins.shade.resource.ServicesResourceTransformer/transformer/transformers/configuration/execution/executions/plugin/plugins/build
/project2配置log4j
resources目录下新建log4j.properties。
log4j.rootLoggererror,stdout
log4j.appender.stdoutorg.apache.log4j.ConsoleAppender
log4j.appender.stdout.targetSystem.out
log4j.appender.stdout.layoutorg.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern%d %p [%c] - %m%n读取数据
常规Source写法
1Batch方式
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
TableLoader tableLoader TableLoader.fromHadoopTable(hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a);
DataStreamRowData batch FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();batch.map(r - Tuple2.of(r.getLong(0),r.getLong(1) )).returns(Types.TUPLE(Types.LONG,Types.LONG)).print();env.execute(Test Iceberg Read);2Streaming方式
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
TableLoader tableLoader TableLoader.fromHadoopTable(hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a);
DataStreamRowData stream FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(true).startSnapshotId(3821550127947089987L).build();stream.map(r - Tuple2.of(r.getLong(0),r.getLong(1) )).returns(Types.TUPLE(Types.LONG,Types.LONG)).print();env.execute(Test Iceberg Read);FLIP-27 Source写法
1Batch方式
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
TableLoader tableLoader TableLoader.fromHadoopTable(hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a);IcebergSourceRowData source1 IcebergSource.forRowData().tableLoader(tableLoader).assignerFactory(new SimpleSplitAssignerFactory()).build();DataStreamRowData batch env.fromSource(Source1,WatermarkStrategy.noWatermarks(),My Iceberg Source,TypeInformation.of(RowData.class));batch.map(r - Tuple2.of(r.getLong(0), r.getLong(1))).returns(Types.TUPLE(Types.LONG, Types.LONG)).print();env.execute(Test Iceberg Read);2Streaming方式
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
TableLoader tableLoader TableLoader.fromHadoopTable(hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a);IcebergSource source2 IcebergSource.forRowData().tableLoader(tableLoader).assignerFactory(new SimpleSplitAssignerFactory()).streaming(true).streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT).monitorInterval(Duration.ofSeconds(60)).build();DataStreamRowData stream env.fromSource(Source2,WatermarkStrategy.noWatermarks(),My Iceberg Source,TypeInformation.of(RowData.class));stream.map(r - Tuple2.of(r.getLong(0), r.getLong(1))).returns(Types.TUPLE(Types.LONG, Types.LONG)).print();env.execute(Test Iceberg Read);写入数据
目前支持DataStreamRowData和DataStreamRow格式的数据流写入Iceberg表。
1写入方式支持 append、overwrite、upsert
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);SingleOutputStreamOperatorRowData input env.fromElements().map(new MapFunctionString, RowData() {Overridepublic RowData map(String s) throws Exception {GenericRowData genericRowData new GenericRowData(2);genericRowData.setField(0, 99L);genericRowData.setField(1, 99L);return genericRowData;}});TableLoader tableLoader TableLoader.fromHadoopTable(hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a);FlinkSink.forRowData(input).tableLoader(tableLoader).append() // append方式//.overwrite(true) // overwrite方式//.upsert(true) // upsert方式;env.execute(Test Iceberg DataStream);2写入选项
FlinkSink.forRowData(input).tableLoader(tableLoader).set(write-format, orc).set(FlinkWriteOptions.OVERWRITE_MODE, true);可配置选项如下
选项默认值说明write-formatParquet同write.format.default写入操作使用的文件格式Parquet, avro或orctarget-file-size-bytes536870912512MB同write.target-file-size-bytes控制生成的文件的大小目标大约为这么多字节upsert-enabled同write.upsert.enabledoverwrite-enabledfalse覆盖表的数据不能和UPSERT模式同时开启distribution-modeNone同 write.distribution-mode定义写数据的分布方式: none:不打乱行; hash:按分区键散列分布;range如果表有SortOrder则通过分区键或排序键分配compression-codec同 write.(fileformat).compression-codeccompression-level同 write.(fileformat).compression-levelcompression-strategy同write.orc.compression-strategy
合并小文件
Iceberg现在不支持在flink sql中检查表需要使用Iceberg提供的Java API来读取元数据来获得表信息。可以通过提交Flink批处理作业将小文件重写为大文件
import org.apache.iceberg.flink.actions.Actions;// 1.获取 Table对象
// 1.1 创建 catalog对象
Configuration conf new Configuration();
HadoopCatalog hadoopCatalog new HadoopCatalog(conf, hdfs://hadoop1:8020/warehouse/spark-iceberg);// 1.2 通过 catalog加载 Table对象
Table table hadoopCatalog.loadTable(TableIdentifier.of(default, a));// 有Table对象就可以获取元数据、进行维护表的操作
// System.out.println(table.history());
// System.out.println(table.expireSnapshots().expireOlderThan());// 2.通过 Actions 来操作 合并
Actions.forTable(table).rewriteDataFiles().targetSizeInBytes(1024L).execute();得到Table对象就可以获取元数据、进行维护表的操作。更多Iceberg提供的API操作考https://iceberg.apache.org/docs/latest/api/