当前位置: 首页 > news >正文

网站开发用什么写php mysql 企业网站源码

网站开发用什么写,php mysql 企业网站源码,战略网页游戏开服表,中国建设造价协会网站文章目录 简介概述作用特性数据存储、计算引擎插件化实时流批一体数据表演化#xff08;Table Evolution#xff09;模式演化#xff08;Schema Evolution#xff09;分区演化#xff08;Partition Evolution#xff09;列顺序演化#xff08;Sort Order Evolution… 文章目录 简介概述作用特性数据存储、计算引擎插件化实时流批一体数据表演化Table Evolution模式演化Schema Evolution分区演化Partition Evolution列顺序演化Sort Order Evolution隐藏分区Hidden Partition镜像数据查询Time Travel支持事务ACID基于乐观锁的并发支持文件级数据剪裁 其他数据湖框架的对比 存储结构数据文件 data files表快照 Snapshot清单列表 Manifest list清单文件 Manifest file 与 Hive集成环境准备创建和管理 Catalog默认使用 HiveCatalog指定 Catalog 类型指定路径加载 基本操作创建表修改表插入表删除表 与 Spark SQL集成环境准备Spark 配置 CatalogHive CatalogHadoop Catalog SQL 操作创建表删除表修改表插入数据查询数据存储过程 DataFrame 操作环境准备读取表检查表写入表维护表 与 Flink SQL 集成环境准备创建和使用 Catalog语法说明Hive CatalogHadoop Catalog配置sql-client初始化文件 DDL 语句创建数据库创建表修改表删除表 插入语句INSERT INTOINSERT OVERWRITEUPSERT 查询语句Batch模式Streaming模式 与Flink集成的不足 与 Flink DataStream 集成环境准备读取数据常规Source写法FLIP-27 Source写法 写入数据合并小文件 简介 概述 为了解决数据存储和计算引擎之间的适配的问题Netflix开发了Iceberg2018年11月16日进入Apache孵化器2020 年5月19日从孵化器毕业成为Apache的顶级项目。 Iceberg是一个面向海量数据分析场景的开放表格式Table Format。表格式Table Format可以理解为元数据以及数据文件的一种组织方式处于计算框架FlinkSpark…之下数据文件之上。 作用 大数据领域发展至今已经经历了相当长时间的发展和探索虽然大数据技术的出现和迭代降低了用户处理海量数据的门槛但是有一个问题不能忽视数据格式对不同引擎适配的对接。 也就是说我们在使用不同的引擎进行计算时需要将数据根据引擎进行适配。这是相当棘手的问题。 为此出现了一种新的解决方案介于上层计算引擎和底层存储格式之间的一个中间层。这个中间层不是数据存储的方式只是定义了数据的元数据组织方式并且向引擎层面提供统一的类似传统数据库中表的语义。它的底层仍然是Parquet、ORC等存储格式。基于此Netflix开发了Iceberg目前已经是Apache的顶级项目。 特性 数据存储、计算引擎插件化 Iceberg提供一个开放通用的表格式Table Format实现方案不和特定的数据存储、计算引擎绑定。目前大数据领域的常见数据存储HDFS、S3…计算引擎Flink、Spark…都可以接入Iceberg。 在生产环境中可选择不同的组件搭使用。甚至可以不通过计算引擎直接读取存在文件系统上的数据。 实时流批一体 Iceberg上游组件将数据写入完成后下游组件及时可读可查询。可以满足实时场景。并且Iceberg同时提供了流/批读接口、流/批写接口。可以在同一个流程里, 同时处理流数据和批数据大大简化了ETL链路。 数据表演化Table Evolution Iceberg可以通过SQL的方式进行表级别模式演进。进行这些操作的时候代价极低。 不存在读出数据重新写入或者迁移数据这种费时费力的操作。 比如在常用的Hive中如果我们需要把一个按天分区的表改成按小时分区。此时不能再原表之上直接修改只能新建一个按小时分区的表然后再把数据Insert到新的小时分区表。而且即使我们通过Rename的命令把新表的名字改为原表使用原表的上次层应用, 也可能由于分区字段修改导致需要修改 SQL这样花费的经历是非常繁琐的。 模式演化Schema Evolution Iceberg支持下面几种模式演化 ADD向表或者嵌套结构增加新列 Drop从表中或者嵌套结构中移除一列 Rename重命名表中或者嵌套结构中的一列 Update将复杂结构(struct, mapkey, value, list)中的基本类型扩展类型长度, 比如tinyint修改成int. Reorder改变列或者嵌套结构中字段的排列顺序 Iceberg保证模式演化Schema Evolution是没有副作用的独立操作流程, 一个元数据操作, 不会涉及到重写数据文件的过程。具体的如下: 增加列时候不会从另外一个列中读取已存在的的数据 删除列或者嵌套结构中字段的时候不会改变任何其他列的值 更新列或者嵌套结构中字段的时候不会改变任何其他列的值 改变列列或者嵌套结构中字段顺序的时候不会改变相关联的值 在表中Iceberg 使用唯一ID来定位每一列的信息。新增一个列的时候,会新分配给它一个唯一ID, 并且绝对不会使用已经被使用的ID。 使用名称或者位置信息来定位列的, 都会存在一些问题, 比如使用名称的话,名称可能会重复, 使用位置的话, 不能修改顺序并且废弃的字段也不能删除。 分区演化Partition Evolution Iceberg可以在一个已存在的表上直接修改因为Iceberg的查询流程并不和分区信息直接关联。 当我们改变一个表的分区策略时对应修改分区之前的数据不会改变, 依然会采用老的分区策略新的数据会采用新的分区策略也就是说同一个表会有两种分区策略旧数据采用旧分区策略新数据采用新新分区策略, 在元数据里两个分区策略相互独立不重合。 在查询数据的时候如果存在跨分区策略的情况则会解析成两个不同执行计划如Iceberg官网提供图所示 图中booking_table表2008年按月分区进入2009年后改为按天分区这种中分区策略共存于该表中。 借助Iceberg的隐藏分区Hidden Partition在写SQL 查询的时候不需要在SQL中特别指定分区过滤条件Iceberg会自动分区过滤掉不需要的数据。 Iceberg分区演化操作同样是一个元数据操作, 不会重写数据文件。 列顺序演化Sort Order Evolution Iceberg可以在一个已经存在的表上修改排序策略。修改了排序策略之后, 旧数据依旧采用老排序策略不变。往Iceberg里写数据的计算引擎总是会选择最新的排序策略, 但是当排序的代价极其高昂的时候, 就不进行排序了。 隐藏分区Hidden Partition Iceberg的分区信息并不需要人工维护, 它可以被隐藏起来. 不同其他类似Hive 的分区策略, Iceberg的分区字段/策略通过某一个字段计算出来可以不是表的字段和表数据存储目录也没有关系。在建表或者修改分区策略之后新的数据会自动计算所属于的分区。在查询的时候同样不用关系表的分区是什么字段/策略只需要关注业务逻辑Iceberg会自动过滤不需要的分区数据。 正是由于Iceberg的分区信息和表数据存储目录是独立的使得Iceberg的表分区可以被修改,而且不和涉及到数据迁移。 镜像数据查询Time Travel Iceberg提供了查询表历史某一时间点数据镜像snapshot的能力。通过该特性可以将最新的SQL逻辑应用到历史数据上。 支持事务ACID Iceberg通过提供事务ACID的机制使其具备了upsert的能力并且使得边写边读成为可能从而数据可以更快的被下游组件消费。通过事务保证了下游组件只能消费已commit的数据而不会读到部分甚至未提交的数据。 基于乐观锁的并发支持 Iceberg基于乐观锁提供了多个程序并发写入的能力并且保证数据线性一致。 文件级数据剪裁 Iceberg的元数据里面提供了每个数据文件的一些统计信息比如最大值最小值Count计数等等。因此查询SQL的过滤条件除了常规的分区列过滤甚至可以下推到文件级别大大加快了查询效率。 其他数据湖框架的对比 存储结构 数据文件 data files 数据文件是Apache Iceberg表真实存储数据的文件一般是在表的数据存储目录的data目录下如果我们的文件格式选择的是parquet,那么文件是以“.parquet”结尾。 例如00000-0-atguigu_20230203160458_22ee74c9-643f-4b27-8fc1-9cbd5f64dad4-job_1675409881387_0007-00001.parquet 就是一个数据文件。 Iceberg每次更新会产生多个数据文件data files。 表快照 Snapshot 快照代表一张表在某个时刻的状态。每个快照里面会列出表在某个时刻的所有 data files 列表。data files是存储在不同的manifest files里面manifest files是存储在一个Manifest list文件里面而一个Manifest list文件代表一个快照。 清单列表 Manifest list manifest list是一个元数据文件它列出构建表快照Snapshot的清单Manifest file。这个元数据文件中存储的是Manifest file列表每个Manifest file占据一行。每行中存储了Manifest file的路径、其存储的数据文件data files的分区范围增加了几个数文件、删除了几个数据文件等信息这些信息可以用来在查询时提供过滤加快速度。 例如snap-6746266566064388720-1-52f2f477-2585-4e69-be42-bbad9a46ed17.avro就是一个Manifest List文件。 清单文件 Manifest file Manifest file也是一个元数据文件它列出组成快照snapshot的数据文件data files的列表信息。每行都是每个数据文件的详细描述包括数据文件的状态、文件路径、分区信息、列级别的统计信息比如每列的最大最小值、空值数等、文件的大小以及文件里面数据行数等信息。其中列级别的统计信息可以在扫描表数据时过滤掉不必要的文件。 Manifest file是以avro格式进行存储的以“.avro”后缀结尾例如52f2f477-2585-4e69-be42-bbad9a46ed17-m0.avro。 与 Hive集成 环境准备 1Hive与Iceberg的版本对应关系如下 Hive 版本官方推荐Hive版本Iceberg 版本2.x2.3.80.8.0-incubating – 1.1.03.x3.1.20.10.0 – 1.1.0 Iceberg与Hive 2和Hive 3.1.2/3的集成支持以下特性 创建表 删除表 读取表 插入表INSERT into 更多功能需要Hive 4.x目前alpha版本才能支持。 2上传jar包拷贝到Hive的auxlib目录中 mkdir auxlib cp iceberg-hive-runtime-1.1.0.jar /opt/module/hive/auxlib cp libfb303-0.9.3.jar /opt/module/hive/auxlibcp iceberg-hive-runtime-1.1.0.jar /opt/module/hive/auxlibcp libfb303-0.9.3.jar /opt/module/hive/auxlib3修改hive-site.xml添加配置项 propertynameiceberg.engine.hive.enabled/namevaluetrue/value /propertypropertynamehive.aux.jars.path/namevalue/opt/module/hive/auxlib/value /property使用TEZ引擎注意事项 使用Hive版本3.1.2需要TEZ版本0.10.1 指定tez更新配置 propertynametez.mrreader.config.update.properties/namevaluehive.io.file.readcolumn.names,hive.io.file.readcolumn.ids/value /property从Iceberg 0.11.0开始如果Hive使用Tez引擎需要关闭向量化执行 propertynamehive.vectorized.execution.enabled/namevaluefalse/value /property4启动HMS服务 5启动 Hadoop 创建和管理 Catalog Iceberg支持多种不同的Catalog类型例如:Hive、Hadoop、亚马逊的AWS Glue和自定义Catalog。 根据不同配置分为三种情况 没有设置iceberg.catalog默认使用HiveCatalog 配置项说明iceberg.catalog.catalog_name.typeCatalog的类型: hive, hadoop, 如果使用自定义Catalog则不设置iceberg.catalog.catalog_name.catalog-implCatalog的实现类, 如果上面的type没有设置则此参数必须设置iceberg.catalog.catalog_name.keyCatalog的其他配置项 设置了 iceberg.catalog的类型使用指定的Catalog类型如下表格 设置 iceberg.cataloglocation_based_table直接通过指定的根路径来加载Iceberg表 默认使用 HiveCatalog CREATE TABLE iceberg_test1 (i int) STORED BY org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;INSERT INTO iceberg_test1 values(1);查看HDFS可以发现表目录在默认的hive仓库路径下。 指定 Catalog 类型 1使用 HiveCatalog set iceberg.catalog.iceberg_hive.typehive; set iceberg.catalog.iceberg_hive.urithrift://hadoop1:9083; set iceberg.catalog.iceberg_hive.clients10; set iceberg.catalog.iceberg_hive.warehousehdfs://hadoop1:8020/warehouse/iceberg-hive;CREATE TABLE iceberg_test2 (i int) STORED BY org.apache.iceberg.mr.hive.HiveIcebergStorageHandler TBLPROPERTIES(iceberg.catalogiceberg_hive);INSERT INTO iceberg_test2 values(1);2使用 HadoopCatalog set iceberg.catalog.iceberg_hadoop.typehadoop; set iceberg.catalog.iceberg_hadoop.warehousehdfs://hadoop1:8020/warehouse/iceberg-hadoop;CREATE TABLE iceberg_test3 (i int) STORED BY org.apache.iceberg.mr.hive.HiveIcebergStorageHandler LOCATION hdfs://hadoop1:8020/warehouse/iceberg-hadoop/default/iceberg_test3 TBLPROPERTIES(iceberg.catalogiceberg_hadoop);INSERT INTO iceberg_test3 values(1);指定路径加载 如果HDFS中已经存在iceberg格式表我们可以通过在Hive中创建Icerberg格式表指定对应的location路径映射数据。 CREATE EXTERNAL TABLE iceberg_test4 (i int) STORED BY org.apache.iceberg.mr.hive.HiveIcebergStorageHandler LOCATION hdfs://hadoop1:8020/warehouse/iceberg-hadoop/default/iceberg_test3 TBLPROPERTIES (iceberg.cataloglocation_based_table);基本操作 创建表 1创建外部表 CREATE EXTERNAL TABLE iceberg_create1 (i int) STORED BY org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;describe formatted iceberg_create1;2创建内部表 CREATE TABLE iceberg_create2 (i int) STORED BY org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;describe formatted iceberg_create2;3创建分区表 CREATE EXTERNAL TABLE iceberg_create3 (id int,name string) PARTITIONED BY (age int) STORED BY org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;describe formatted iceberg_create3;Hive语法创建分区表不会在HMS中创建分区而是将分区数据转换为Iceberg标识分区。这种情况下不能使用Iceberg的分区转换例如days(timestamp)如果想要使用Iceberg格式表的分区转换标识分区需要使用Spark或者Flink引擎创建表。 修改表 只支持HiveCatalog表修改表属性Iceberg表属性和Hive表属性存储在HMS中是同步的。 ALTER TABLE iceberg_create1 SET TBLPROPERTIES(external.table.purgeFALSE);插入表 支持标准单表INSERT INTO操作 INSERT INTO iceberg_create2 VALUES (1); INSERT INTO iceberg_create1 select * from iceberg_create2;在HIVE 3.x中INSERT OVERWRITE虽然能执行但其实是追加。 删除表 DROP TABLE iceberg_create1;与 Spark SQL集成 环境准备 1安装 Spark 1Spark与Iceberg的版本对应关系如下 Spark 版本Iceberg 版本2.40.7.0-incubating – 1.1.03.00.9.0 – 1.0.03.10.12.0 – 1.1.03.20.13.0 – 1.1.03.30.14.0 – 1.1.0 2上传并解压Spark安装包 tar -zxvf spark-3.3.1-bin-hadoop3.tgz -C /opt/module/ mv /opt/module/spark-3.3.1-bin-hadoop3 /opt/module/spark-3.3.13配置环境变量 sudo vim /etc/profile.d/my_env.shexport SPARK_HOME/opt/module/spark-3.3.1 export PATH$PATH:$SPARK_HOME/binsource /etc/profile.d/my_env.sh4拷贝iceberg的jar包到Spark的jars目录 cp /opt/software/iceberg/iceberg-spark-runtime-3.3_2.12-1.1.0.jar /opt/module/spark-3.3.1/jars2启动 Hadoop Spark 配置 Catalog Spark中支持两种Catalog的设置hive和hadoopHive Catalog就是Iceberg表存储使用Hive默认的数据路径Hadoop Catalog需要指定Iceberg格式表存储路径。 vim spark-defaults.confHive Catalog spark.sql.catalog.hive_prod org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.hive_prod.type hive spark.sql.catalog.hive_prod.uri thrift://hadoop1:9083use hive_prod.db;Hadoop Catalog spark.sql.catalog.hadoop_prod org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.hadoop_prod.type hadoop spark.sql.catalog.hadoop_prod.warehouse hdfs://hadoop1:8020/warehouse/spark-iceberguse hadoop_prod.db;SQL 操作 创建表 use hadoop_prod; create database default; use default;CREATE TABLE hadoop_prod.default.sample1 (id bigint COMMENT unique id,data string) USING icebergPARTITIONED BY (partition-expressions) 配置分区 LOCATION ‘(fully-qualified-uri)’ 指定表路径 COMMENT ‘table documentation’ 配置表备注 TBLPROPERTIES (‘key’‘value’, …) 配置表属性 表属性https://iceberg.apache.org/docs/latest/configuration/ 对Iceberg表的每次更改都会生成一个新的元数据文件json文件以提供原子性。默认情况下旧元数据文件作为历史文件保存不会删除。 如果要自动清除元数据文件在表属性中设置write.metadata.delete-after-commit.enabledtrue。这将保留一些元数据文件直到write.metadata.previous-versions-max并在每个新创建的元数据文件之后删除旧的元数据文件。 1创建分区表 1分区表 CREATE TABLE hadoop_prod.default.sample2 (id bigint,data string,category string) USING iceberg PARTITIONED BY (category)2创建隐藏分区表 CREATE TABLE hadoop_prod.default.sample3 (id bigint,data string,category string,ts timestamp) USING iceberg PARTITIONED BY (bucket(16, id), days(ts), category)支持的转换有: years(ts):按年划分 months(ts):按月划分 days(ts)或date(ts):等效于dateint分区 hours(ts)或date_hour(ts):等效于dateint和hour分区 bucket(N, col):按哈希值划分mod N个桶 truncate(L, col):按截断为L的值划分 字符串被截断为给定的长度 整型和长型截断为bin: truncate(10, i)生成分区0,10,20,30… 2使用 CTAS 语法建表 CREATE TABLE hadoop_prod.default.sample4 USING iceberg AS SELECT * from hadoop_prod.default.sample3不指定分区就是无分区需要重新指定分区、表属性 CREATE TABLE hadoop_prod.default.sample5 USING iceberg PARTITIONED BY (bucket(8, id), hours(ts), category) TBLPROPERTIES (keyvalue) AS SELECT * from hadoop_prod.default.sample33使用 Replace table 建表 REPLACE TABLE hadoop_prod.default.sample5 USING iceberg AS SELECT * from hadoop_prod.default.sample3REPLACE TABLE hadoop_prod.default.sample5 USING iceberg PARTITIONED BY (part) TBLPROPERTIES (keyvalue) AS SELECT * from hadoop_prod.default.sample3CREATE OR REPLACE TABLE hadoop_prod.default.sample6 USING iceberg AS SELECT * from hadoop_prod.default.sample3删除表 对于HadoopCatalog而言运行DROP TABLE将从catalog中删除表并删除表内容。 CREATE EXTERNAL TABLE hadoop_prod.default.sample7 (id bigint COMMENT unique id,data string) USING icebergINSERT INTO hadoop_prod.default.sample7 values(1,a) DROP TABLE hadoop_prod.default.sample7对于HiveCatalog而言 在0.14之前运行DROP TABLE将从catalog中删除表并删除表内容。 从0.14开始DROP TABLE只会从catalog中删除表不会删除数据。为了删除表内容应该使用DROP table PURGE。 CREATE TABLE hive_prod.default.sample7 (id bigint COMMENT unique id,data string) USING icebergINSERT INTO hive_prod.default.sample7 values(1,a)1删除表 DROP TABLE hive_prod.default.sample72删除表和数据 DROP TABLE hive_prod.default.sample7 PURGE修改表 Iceberg在Spark 3中完全支持ALTER TABLE包括: 重命名表 设置或删除表属性 添加、删除和重命名列 添加、删除和重命名嵌套字段 重新排序顶级列和嵌套结构字段 扩大int、float和decimal字段的类型 将必选列变为可选列 此外还可以使用SQL扩展来添加对分区演变的支持和设置表的写顺序。 CREATE TABLE hive_prod.default.sample1 (id bigint COMMENT unique id,data string) USING iceberg1修改表名不支持修改HadoopCatalog的表名 ALTER TABLE hive_prod.default.sample1 RENAME TO hive_prod.default.sample22修改表属性 修改表属性 ALTER TABLE hive_prod.default.sample1 SET TBLPROPERTIES (read.split.target-size268435456 )ALTER TABLE hive_prod.default.sample1 SET TBLPROPERTIES (comment A table comment. )删除表属性 ALTER TABLE hive_prod.default.sample1 UNSET TBLPROPERTIES (read.split.target-size)3添加列 ALTER TABLE hadoop_prod.default.sample1 ADD COLUMNS (category string comment new_column )-- 添加struct类型的列 ALTER TABLE hadoop_prod.default.sample1 ADD COLUMN point structx: double, y: double;-- 往struct类型的列中添加字段 ALTER TABLE hadoop_prod.default.sample1 ADD COLUMN point.z double-- 创建struct的嵌套数组列 ALTER TABLE hadoop_prod.default.sample1 ADD COLUMN points arraystructx: double, y: double;-- 在数组中的结构中添加一个字段。使用关键字element访问数组的元素列。 ALTER TABLE hadoop_prod.default.sample1 ADD COLUMN points.element.z double-- 创建一个包含Map类型的列key和value都为struct类型 ALTER TABLE hadoop_prod.default.sample1 ADD COLUMN pointsm mapstructx: int, structa: int;-- 在Map类型的value的struct中添加一个字段。 ALTER TABLE hadoop_prod.default.sample1 ADD COLUMN pointsm.value.b int在Spark 2.4.4及以后版本中可以通过添加FIRST或AFTER子句在任何位置添加列: ALTER TABLE hadoop_prod.default.sample1 ADD COLUMN new_column1 bigint AFTER idALTER TABLE hadoop_prod.default.sample1 ADD COLUMN new_column2 bigint FIRST4修改列 修改列名 ALTER TABLE hadoop_prod.default.sample1 RENAME COLUMN data TO data1Alter Column修改类型只允许安全的转换 ALTER TABLE hadoop_prod.default.sample1 ADD COLUMNS (idd int) ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN idd TYPE bigintAlter Column 修改列的注释 ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN id TYPE double COMMENT a ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN id COMMENT bAlter Column修改列的顺序 ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN id FIRST ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN new_column2 AFTER new_column1Alter Column修改列是否允许为null ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN id DROP NOT NULLALTER COLUMN不用于更新struct类型。使用ADD COLUMN和DROP COLUMN添加或删除struct类型的字段。 5删除列 ALTER TABLE hadoop_prod.default.sample1 DROP COLUMN idd ALTER TABLE hadoop_prod.default.sample1 DROP COLUMN point.z6添加分区Spark3需要配置扩展 vim spark-default.conf spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions重新进入spark-sql shell ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD category ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD bucket(16, id) ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD truncate(data, 4) ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD years(ts)ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD bucket(16, id) AS shard7删除分区Spark3需要配置扩展 ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD category ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD bucket(16, id) ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD truncate(data, 4) ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD years(ts) ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD shard注意尽管删除了分区但列仍然存在于表结构中。 删除分区字段是元数据操作不会改变任何现有的表数据。新数据将被写入新的分区但现有数据将保留在旧的分区布局中。 当分区发生变化时动态分区覆盖行为也会发生变化。例如如果按天划分分区而改为按小时划分分区那么覆盖将覆盖每小时划分的分区而不再覆盖按天划分的分区。 删除分区字段时要小心可能导致元数据查询失败或产生不同的结果。 8修改分区Spark3需要配置扩展 ALTER TABLE hadoop_prod.default.sample1 REPLACE PARTITION FIELD bucket(16, id) WITH bucket(8, id)9修改表的写入顺序 ALTER TABLE hadoop_prod.default.sample1 WRITE ORDERED BY category, id ALTER TABLE hadoop_prod.default.sample1 WRITE ORDERED BY category ASC, id DESC ALTER TABLE hadoop_prod.default.sample1 WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST表写顺序不能保证查询的数据顺序。它只影响数据写入表的方式。 WRITE ORDERED BY设置了一个全局排序即跨任务的行排序就像在INSERT命令中使用ORDER BY一样: INSERT INTO hadoop_prod.default.sample1 SELECT id, data, category, ts FROM another_table ORDER BY ts, category要在每个任务内排序而不是跨任务排序使用local ORDERED BY: ALTER TABLE hadoop_prod.default.sample1 WRITE LOCALLY ORDERED BY category, id10按分区并行写入 ALTER TABLE hadoop_prod.default.sample1 WRITE DISTRIBUTED BY PARTITION ALTER TABLE hadoop_prod.default.sample1 WRITE DISTRIBUTED BY PARTITION LOCALLY ORDERED BY category, id插入数据 CREATE TABLE hadoop_prod.default.a (id bigint,count bigint) USING icebergCREATE TABLE hadoop_prod.default.b (id bigint,count bigint,flag string) USING iceberg1Insert Into INSERT INTO hadoop_prod.default.a VALUES (1, 1), (2, 2), (3, 3); INSERT INTO hadoop_prod.default.b VALUES (1, 1, a), (2, 2, b), (4, 4, d);2MERGE INTO行级更新 MERGE INTO hadoop_prod.default.a t USING (SELECT * FROM hadoop_prod.default.b) u ON t.id u.id WHEN MATCHED AND u.flagb THEN UPDATE SET t.count t.count u.count WHEN MATCHED AND u.flaga THEN DELETE WHEN NOT MATCHED THEN INSERT (id,count) values (u.id,u.count)查询数据 1普通查询 SELECT count(1) as count, data FROM local.db.table GROUP BY data2查询元数据 // 查询表快照 SELECT * FROM hadoop_prod.default.a.snapshots// 查询数据文件信息 SELECT * FROM hadoop_prod.default.a.files// 查询表历史 SELECT * FROM hadoop_prod.default.a.history// 查询 manifest ELECT * FROM hadoop_prod.default.a.manifests存储过程 Procedures可以通过CALL从任何已配置的Iceberg Catalog中使用。所有Procedures都在namespace中。 1语法 按照参数名传参 CALL catalog_name.system.procedure_name(arg_name_2 arg_2, arg_name_1 arg_1)当按位置传递参数时如果结束参数是可选的则只有结束参数可以省略。 CALL catalog_name.system.procedure_name(arg_1, arg_2, ... arg_n)2快照管理 回滚到指定的快照id CALL hadoop_prod.system.rollback_to_snapshot(default.a, 7601163594701794741)回滚到指定时间的快照 CALL hadoop_prod.system.rollback_to_timestamp(db.sample, TIMESTAMP 2021-06-30 00:00:00.000)设置表的当前快照ID CALL hadoop_prod.system.set_current_snapshot(db.sample, 1)从快照变为当前表状态 CALL hadoop_prod.system.cherrypick_snapshot(default.a, 7629160535368763452) CALL hadoop_prod.system.cherrypick_snapshot(snapshot_id 7629160535368763452, table default.a )3元数据管理 删除早于指定日期和时间的快照但保留最近100个快照 CALL hive_prod.system.expire_snapshots(db.sample, TIMESTAMP 2021-06-30 00:00:00.000, 100)删除Iceberg表中任何元数据文件中没有引用的文件 #列出所有需要删除的候选文件 CALL catalog_name.system.remove_orphan_files(table db.sample, dry_run true) #删除指定目录中db.sample表不知道的任何文件 CALL catalog_name.system.remove_orphan_files(table db.sample, location tablelocation/data)合并数据文件合并小文件 CALL catalog_name.system.rewrite_data_files(db.sample) CALL catalog_name.system.rewrite_data_files(table db.sample, strategy sort, sort_order id DESC NULLS LAST,name ASC NULLS FIRST) CALL catalog_name.system.rewrite_data_files(table db.sample, strategy sort, sort_order zorder(c1,c2)) CALL catalog_name.system.rewrite_data_files(table db.sample, options map(min-input-files,2)) CALL catalog_name.system.rewrite_data_files(table db.sample, where id 3 and name foo)重写表清单来优化执行计划 CALL catalog_name.system.rewrite_manifests(db.sample)#重写表db中的清单。并禁用Spark缓存的使用。这样做可以避免执行程序上的内存问题。 CALL catalog_name.system.rewrite_manifests(db.sample, false)4迁移表 快照 CALL catalog_name.system.snapshot(db.sample, db.snap) CALL catalog_name.system.snapshot(db.sample, db.snap, /tmp/temptable/)迁移 CALL catalog_name.system.migrate(spark_catalog.db.sample, map(foo, bar)) CALL catalog_name.system.migrate(db.sample)添加数据文件 CALL spark_catalog.system.add_files(table db.tbl,source_table db.src_tbl,partition_filter map(part_col_1, A) )CALL spark_catalog.system.add_files(table db.tbl,source_table parquet.path/to/table )5元数据信息 获取指定快照的父快照id CALL spark_catalog.system.ancestors_of(db.tbl)获取指定快照的所有祖先快照 CALL spark_catalog.system.ancestors_of(db.tbl, 1) CALL spark_catalog.system.ancestors_of(snapshot_id 1, table db.tbl)DataFrame 操作 环境准备 1创建maven工程配置pom文件 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.atguigu.iceberg/groupIdartifactIdspark-iceberg-demo/artifactIdversion1.0-SNAPSHOT/versionpropertiesscala.binary.version2.12/scala.binary.versionspark.version3.3.1/spark.versionmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.target/propertiesdependencies!-- Spark的依赖引入 --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_${scala.binary.version}/artifactIdscopeprovided/scopeversion${spark.version}/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql_${scala.binary.version}/artifactIdscopeprovided/scopeversion${spark.version}/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-hive_${scala.binary.version}/artifactIdscopeprovided/scopeversion${spark.version}/version/dependency!--fastjson 1.2.80 存在安全漏洞,--dependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.83/version/dependency!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark-runtime-3.3 --dependencygroupIdorg.apache.iceberg/groupIdartifactIdiceberg-spark-runtime-3.3_2.12/artifactIdversion1.1.0/version/dependency/dependenciesbuildplugins!-- assembly打包插件 --plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-assembly-plugin/artifactIdversion3.0.0/versionexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executionsconfigurationarchivemanifest/manifest/archivedescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configuration/plugin!--Maven编译scala所需依赖--plugingroupIdnet.alchim31.maven/groupIdartifactIdscala-maven-plugin/artifactIdversion3.2.2/versionexecutionsexecutiongoalsgoalcompile/goalgoaltestCompile/goal/goals/execution/executions/plugin/plugins/build /project2配置Catalog val spark: SparkSession SparkSession.builder().master(local).appName(this.getClass.getSimpleName)//指定hive catalog, catalog名称为iceberg_hive.config(spark.sql.catalog.iceberg_hive, org.apache.iceberg.spark.SparkCatalog).config(spark.sql.catalog.iceberg_hive.type, hive).config(spark.sql.catalog.iceberg_hive.uri, thrift://hadoop1:9083)// .config(iceberg.engine.hive.enabled, true)//指定hadoop catalogcatalog名称为iceberg_hadoop .config(spark.sql.catalog.iceberg_hadoop, org.apache.iceberg.spark.SparkCatalog).config(spark.sql.catalog.iceberg_hadoop.type, hadoop).config(spark.sql.catalog.iceberg_hadoop.warehouse, hdfs://hadoop1:8020/warehouse/spark-iceberg).getOrCreate()读取表 1加载表 spark.read .format(iceberg) .load(hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a) .show()或 // 仅支持Spark3.0以上 spark.table(iceberg_hadoop.default.a) .show()2时间旅行指定时间查询 spark.read.option(as-of-timestamp, 499162860000).format(iceberg) .load(hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a) .show()3时间旅行指定快照id查询 spark.read.option(snapshot-id, 7601163594701794741L).format(iceberg) .load(hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a) .show()4增量查询 spark.read .format(iceberg) .option(start-snapshot-id, 10963874102873) .option(end-snapshot-id, 63874143573109) .load(hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a) .show()查询的表只能是append的方式写数据不支持replace, overwrite, delete操作。 检查表 1查询元数据 spark.read.format(iceberg).load(iceberg_hadoop.default.a.files) spark.read.format(iceberg).load(hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a#files)2元数据表时间旅行查询 spark.read .format(iceberg) .option(snapshot-id, 7601163594701794741L) .load(iceberg_hadoop.default.a.files)写入表 1创建样例类准备DF case class Sample(id:Int,data:String,category:String)val df: DataFrame spark.createDataFrame(Seq(Sample(1,A, a), Sample(2,B, b), Sample(3,C, c)))2插入数据并建表 df.writeTo(iceberg_hadoop.default.table1).create()import spark.implicits._ df.writeTo(iceberg_hadoop.default.table1).tableProperty(write.format.default, orc).partitionedBy($category).createOrReplace()3append追加 df.writeTo(iceberg_hadoop.default.table1).append()4动态分区覆盖 df.writeTo(iceberg_hadoop.default.table1).overwritePartitions()5静态分区覆盖 import spark.implicits._ df.writeTo(iceberg_hadoop.default.table1).overwrite($category c)6插入分区表且分区内排序 df.sortWithinPartitions(category).writeTo(iceberg_hadoop.default.table1).append()维护表 1获取Table对象 1HadoopCatalog import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier;val conf new Configuration() val catalog new HadoopCatalog(conf,hdfs://hadoop1:8020/warehouse/spark-iceberg) val table: Table catalog.loadTable(TableIdentifier.of(db,table1))2HiveCatalog import org.apache.iceberg.hive.HiveCatalog; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier;val catalog new HiveCatalog() catalog.setConf(spark.sparkContext.hadoopConfiguration)val properties new util.HashMap[String,String]() properties.put(warehouse, hdfs://hadoop1:8020/warehouse/spark-iceberg) properties.put(uri, thrift://hadoop1:9083)catalog.initialize(hive, properties) val table: Table catalog.loadTable(TableIdentifier.of(db, table1))2快照过期清理 每次写入Iceberg表都会创建一个表的新快照或版本。快照可以用于时间旅行查询或者可以将表回滚到任何有效的快照。建议设置快照过期时间过期的旧快照将从元数据中删除不再可用于时间旅行查询。 // 1天过期时间 val tsToExpire: Long System.currentTimeMillis() - (1000 * 60 * 60 * 24)table.expireSnapshots().expireOlderThan(tsToExpire).commit()或使用SparkActions来设置过期 //SparkActions可以并行运行大型表的表过期设置 SparkActions.get().expireSnapshots(table).expireOlderThan(tsToExpire).execute()3删除无效文件 在Spark和其他分布式处理引擎中任务或作业失败可能会留下未被表元数据引用的文件在某些情况下正常的快照过期可能无法确定不再需要并删除该文件。 SparkActions.get().deleteOrphanFiles(table).execute()4合并小文件 数据文件过多会导致更多的元数据存储在清单文件中而较小的数据文件会导致不必要的元数据量和更低效率的文件打开成本。 SparkActions.get().rewriteDataFiles(table).filter(Expressions.equal(category, a)).option(target-file-size-bytes, 1024L.toString) //1KB.execute()与 Flink SQL 集成 Apache Iceberg同时支持Apache Flink的DataStream API和Table API。 环境准备 1安装 Flink 1Flink与Iceberg的版本对应关系如下 Flink 版本Iceberg 版本1.110.9.0 – 0.12.11.120.12.0 – 0.13.11.130.13.0 – 1.0.01.140.13.0 – 1.1.01.150.14.0 – 1.1.01.161.1.0 – 1.1.0 2上传并解压Flink安装包 tar -zxvf flink-1.16.0-bin-scala_2.12.tgz -C /opt/module/3配置环境变量 sudo vim /etc/profile.d/my_env.sh export HADOOP_CLASSPATHhadoop classpath source /etc/profile.d/my_env.sh4拷贝iceberg的jar包到Flink的lib目录 cp /opt/software/iceberg/iceberg-flink-runtime-1.16-1.1.0.jar /opt/module/flink-1.16.0/lib2启动 Hadoop 3启动 sql-client 1修改flink-conf.yaml配置 vim /opt/module/flink-1.16.0/conf/flink-conf.yamlclassloader.check-leaked-classloader: false taskmanager.numberOfTaskSlots: 4state.backend: rocksdb execution.checkpointing.interval: 30000 state.checkpoints.dir: hdfs://hadoop1:8020/ckps state.backend.incremental: true2local模式 1修改workers vim /opt/module/flink-1.16.0/conf/workers #表示会在本地启动3个TaskManager的 local集群 localhost localhost localhost2启动Flink /opt/module/flink-1.16.0/bin/start-cluster.sh查看webuihttp://hadoop1:8081 3启动Flink的sql-client /opt/module/flink-1.16.0/bin/sql-client.sh embedded创建和使用 Catalog 语法说明 CREATE CATALOG catalog_name WITH (typeiceberg,config_keyconfig_value ); type: 必须是iceberg。必须 catalog-type: 内置了hive和hadoop两种catalog也可以使用catalog-impl来自定义catalog。可选 catalog-impl: 自定义catalog实现的全限定类名。如果未设置catalog-type则必须设置。可选 property-version: 描述属性版本的版本号。此属性可用于向后兼容以防属性格式更改。当前属性版本为1。可选 cache-enabled: 是否启用目录缓存默认值为true。可选 cache.expiration-interval-ms: 本地缓存catalog条目的时间(以毫秒为单位)负值如-1表示没有时间限制不允许设为0。默认值为-1。可选 Hive Catalog 1上传hive connector到flink的lib中 cp flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar /opt/module/flink-1.16.0/lib/2启动hive metastore服务 hive --service metastore3创建hive catalog 重启flink集群重新进入sql-client CREATE CATALOG hive_catalog WITH (typeiceberg,catalog-typehive,urithrift://hadoop1:9083,clients5,property-version1,warehousehdfs://hadoop1:8020/warehouse/iceberg-hive );use catalog hive_catalog;use catalog hive_catalog; uri: Hive metastore的thrift uri。(必选) clients:Hive metastore客户端池大小默认为2。(可选) warehouse: 数仓目录。 hive-conf-dir:包含hive-site.xml配置文件的目录路径hive-site.xml中hive.metastore.warehouse.dir 的值会被warehouse覆盖。 hadoop-conf-dir:包含core-site.xml和hdfs-site.xml配置文件的目录路径。 Hadoop Catalog Iceberg还支持HDFS中基于目录的catalog可以使用’catalog-type’hadoop’配置。 CREATE CATALOG hadoop_catalog WITH (typeiceberg,catalog-typehadoop,warehousehdfs://hadoop1:8020/warehouse/iceberg-hadoop,property-version1 );use catalog hadoop_catalog;warehouse:存放元数据文件和数据文件的HDFS目录。必需 配置sql-client初始化文件 vim /opt/module/flink-1.16.0/conf/sql-client-init.sqlCREATE CATALOG hive_catalog WITH (typeiceberg,catalog-typehive,urithrift://hadoop1:9083,warehousehdfs://hadoop1:8020/warehouse/iceberg-hive );USE CATALOG hive_catalog;后续启动sql-client时加上 -i sql文件路径 即可完成catalog的初始化。 /opt/module/flink-1.16.0/bin/sql-client.sh embedded -i conf/sql-client-init.sqlDDL 语句 创建数据库 CREATE DATABASE iceberg_db; USE iceberg_db;创建表 CREATE TABLE hive_catalog.default.sample (id BIGINT COMMENT unique id,data STRING );建表命令现在支持最常用的flink建表语法包括: PARTITION BY (column1, column2, …)配置分区apache flink还不支持隐藏分区。 COMMENT ‘table document’指定表的备注 WITH (‘key’‘value’, …)设置表属性 目前不支持计算列、watermark支持主键。 1创建分区表 CREATE TABLE hive_catalog.default.sample (id BIGINT COMMENT unique id,data STRING ) PARTITIONED BY (data);Apache Iceberg支持隐藏分区但Apache flink不支持在列上通过函数进行分区现在无法在flink DDL中支持隐藏分区。 2使用LIKE语法建表 LIKE语法用于创建一个与另一个表具有相同schema、分区和属性的表。 CREATE TABLE hive_catalog.default.sample (id BIGINT COMMENT unique id,data STRING );CREATE TABLE hive_catalog.default.sample_like LIKE hive_catalog.default.sample;修改表 1修改表属性 ALTER TABLE hive_catalog.default.sample SET (write.format.defaultavro);2修改表名 ALTER TABLE hive_catalog.default.sample RENAME TO hive_catalog.default.new_sample;删除表 DROP TABLE hive_catalog.default.sample;插入语句 INSERT INTO INSERT INTO hive_catalog.default.sample VALUES (1, a); INSERT INTO hive_catalog.default.sample SELECT id, data from sample2;INSERT OVERWRITE 仅支持Flink的Batch模式 SET execution.runtime-mode batch; INSERT OVERWRITE sample VALUES (1, a); INSERT OVERWRITE hive_catalog.default.sample PARTITION(dataa) SELECT 6;UPSERT 当将数据写入v2表格式时Iceberg支持基于主键的UPSERT。有两种方法可以启用upsert。 1建表时指定 CREATE TABLE hive_catalog.test1.sample5 (id INT UNIQUE COMMENT unique id,data STRING NOT NULL,PRIMARY KEY(id) NOT ENFORCED ) with (format-version2, write.upsert.enabledtrue );2插入时指定 INSERT INTO tableName /* OPTIONS(upsert-enabledtrue) */ ...插入的表format-version需要为2。 OVERWRITE和UPSERT不能同时设置。在UPSERT模式下如果对表进行分区则分区字段必须也是主键。 3读取Kafka流upsert插入到iceberg表中 create table default_catalog.default_database.kafka(id int,data string ) with (connector kafka,topic test111,properties.zookeeper.connect hadoop1:2181,properties.bootstrap.servers hadoop1:9092,format json,properties.group.idiceberg,scan.startup.modeearliest-offset );INSERT INTO hive_catalog.test1.sample5 SELECT * FROM default_catalog.default_database.kafka;查询语句 Iceberg支持Flink的流式和批量读取。 Batch模式 SET execution.runtime-mode batch; select * from sample;Streaming模式 SET execution.runtime-mode streaming; SET table.dynamic-table-options.enabledtrue; SET sql-client.execution.result-modetableau;1从当前快照读取所有记录然后从该快照读取增量数据 SELECT * FROM sample5 /* OPTIONS(streamingtrue, monitor-interval1s)*/ ;2读取指定快照id不包含后的增量数据 SELECT * FROM sample /* OPTIONS(streamingtrue, monitor-interval1s, start-snapshot-id3821550127947089987)*/ ;monitor-interval: 连续监控新提交数据文件的时间间隔默认为10s。 start-snapshot-id: 流作业开始的快照id。 **注意**如果是无界数据流式upsert进iceberg表读kafkaupsert进iceberg表那么再去流读iceberg表会存在读不出数据的问题。如果无界数据流式append进iceberg表读kafkaappend进iceberg表那么流读该iceberg表可以正常看到结果。 与Flink集成的不足 支持的特性Flink备注SQL create catalog√SQL create database√SQL create table√SQL create table like√SQL alter table√只支持修改表属性不支持更改列和分区SQL drop_table√SQL select√支持流式和批处理模式SQL insert into√支持流式和批处理模式SQL insert overwrite√DataStream read√DataStream append√DataStream overwrite√Metadata tables支持Java API不支持Flink SQLRewrite files action√ 不支持创建隐藏分区的Iceberg表。 不支持创建带有计算列的Iceberg表。 不支持创建带watermark的Iceberg表。 不支持添加列删除列重命名列更改列。 Iceberg目前不支持Flink SQL 查询表的元数据信息需要使用Java API 实现。 与 Flink DataStream 集成 环境准备 1配置pom文件 新建Maven工程pom文件配置如下 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.atguigu.iceberg/groupIdartifactIdflink-iceberg-demo/artifactIdversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetflink.version1.16.0/flink.versionjava.version1.8/java.versionscala.binary.version2.12/scala.binary.versionslf4j.version1.7.30/slf4j.version/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/versionscopeprovided/scope !--不会打包到依赖中只参与编译不参与运行 --/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner_${scala.binary.version}/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-files/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!--idea运行时也有webui--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-runtime-web/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion${slf4j.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion${slf4j.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-to-slf4j/artifactIdversion2.14.0/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-statebackend-rocksdb/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion3.1.3/versionscopeprovided/scope/dependency!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-flink-runtime-1.16 --dependencygroupIdorg.apache.iceberg/groupIdartifactIdiceberg-flink-runtime-1.16/artifactIdversion1.1.0/version/dependency/dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion3.2.4/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationartifactSetexcludesexcludecom.google.code.findbugs:jsr305/excludeexcludeorg.slf4j:*/excludeexcludelog4j:*/excludeexcludeorg.apache.hadoop:*/exclude/excludes/artifactSetfiltersfilter!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --artifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filterstransformers combine.childrenappendtransformerimplementationorg.apache.maven.plugins.shade.resource.ServicesResourceTransformer/transformer/transformers/configuration/execution/executions/plugin/plugins/build /project2配置log4j resources目录下新建log4j.properties。 log4j.rootLoggererror,stdout log4j.appender.stdoutorg.apache.log4j.ConsoleAppender log4j.appender.stdout.targetSystem.out log4j.appender.stdout.layoutorg.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern%d %p [%c] - %m%n读取数据 常规Source写法 1Batch方式 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); TableLoader tableLoader TableLoader.fromHadoopTable(hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a); DataStreamRowData batch FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();batch.map(r - Tuple2.of(r.getLong(0),r.getLong(1) )).returns(Types.TUPLE(Types.LONG,Types.LONG)).print();env.execute(Test Iceberg Read);2Streaming方式 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); TableLoader tableLoader TableLoader.fromHadoopTable(hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a); DataStreamRowData stream FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(true).startSnapshotId(3821550127947089987L).build();stream.map(r - Tuple2.of(r.getLong(0),r.getLong(1) )).returns(Types.TUPLE(Types.LONG,Types.LONG)).print();env.execute(Test Iceberg Read);FLIP-27 Source写法 1Batch方式 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); TableLoader tableLoader TableLoader.fromHadoopTable(hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a);IcebergSourceRowData source1 IcebergSource.forRowData().tableLoader(tableLoader).assignerFactory(new SimpleSplitAssignerFactory()).build();DataStreamRowData batch env.fromSource(Source1,WatermarkStrategy.noWatermarks(),My Iceberg Source,TypeInformation.of(RowData.class));batch.map(r - Tuple2.of(r.getLong(0), r.getLong(1))).returns(Types.TUPLE(Types.LONG, Types.LONG)).print();env.execute(Test Iceberg Read);2Streaming方式 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); TableLoader tableLoader TableLoader.fromHadoopTable(hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a);IcebergSource source2 IcebergSource.forRowData().tableLoader(tableLoader).assignerFactory(new SimpleSplitAssignerFactory()).streaming(true).streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT).monitorInterval(Duration.ofSeconds(60)).build();DataStreamRowData stream env.fromSource(Source2,WatermarkStrategy.noWatermarks(),My Iceberg Source,TypeInformation.of(RowData.class));stream.map(r - Tuple2.of(r.getLong(0), r.getLong(1))).returns(Types.TUPLE(Types.LONG, Types.LONG)).print();env.execute(Test Iceberg Read);写入数据 目前支持DataStreamRowData和DataStreamRow格式的数据流写入Iceberg表。 1写入方式支持 append、overwrite、upsert StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);SingleOutputStreamOperatorRowData input env.fromElements().map(new MapFunctionString, RowData() {Overridepublic RowData map(String s) throws Exception {GenericRowData genericRowData new GenericRowData(2);genericRowData.setField(0, 99L);genericRowData.setField(1, 99L);return genericRowData;}});TableLoader tableLoader TableLoader.fromHadoopTable(hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a);FlinkSink.forRowData(input).tableLoader(tableLoader).append() // append方式//.overwrite(true) // overwrite方式//.upsert(true) // upsert方式;env.execute(Test Iceberg DataStream);2写入选项 FlinkSink.forRowData(input).tableLoader(tableLoader).set(write-format, orc).set(FlinkWriteOptions.OVERWRITE_MODE, true);可配置选项如下 选项默认值说明write-formatParquet同write.format.default写入操作使用的文件格式Parquet, avro或orctarget-file-size-bytes536870912512MB同write.target-file-size-bytes控制生成的文件的大小目标大约为这么多字节upsert-enabled同write.upsert.enabledoverwrite-enabledfalse覆盖表的数据不能和UPSERT模式同时开启distribution-modeNone同 write.distribution-mode定义写数据的分布方式: none:不打乱行; hash:按分区键散列分布;range如果表有SortOrder则通过分区键或排序键分配compression-codec同 write.(fileformat).compression-codeccompression-level同 write.(fileformat).compression-levelcompression-strategy同write.orc.compression-strategy 合并小文件 Iceberg现在不支持在flink sql中检查表需要使用Iceberg提供的Java API来读取元数据来获得表信息。可以通过提交Flink批处理作业将小文件重写为大文件 import org.apache.iceberg.flink.actions.Actions;// 1.获取 Table对象 // 1.1 创建 catalog对象 Configuration conf new Configuration(); HadoopCatalog hadoopCatalog new HadoopCatalog(conf, hdfs://hadoop1:8020/warehouse/spark-iceberg);// 1.2 通过 catalog加载 Table对象 Table table hadoopCatalog.loadTable(TableIdentifier.of(default, a));// 有Table对象就可以获取元数据、进行维护表的操作 // System.out.println(table.history()); // System.out.println(table.expireSnapshots().expireOlderThan());// 2.通过 Actions 来操作 合并 Actions.forTable(table).rewriteDataFiles().targetSizeInBytes(1024L).execute();得到Table对象就可以获取元数据、进行维护表的操作。更多Iceberg提供的API操作考https://iceberg.apache.org/docs/latest/api/
http://www.zqtcl.cn/news/539922/

相关文章:

  • 企业网站建设可以分为几个层次三亚网站定制
  • 手机网站可以做商城吗如何为公司建立网站
  • 淄博建设银行网站怎么做盗号网站手机
  • 网站建设推广的10种方法精美个人网站
  • 西安专业承接网站搭建模板网站聚合页
  • 便宜网站建设加盟推广公司
  • 手机移动端网站怎么做三维建设项目管理网站
  • 如何把网站设为正确建设中广东学校网站建设公司
  • 企业型网站建设怎样收费dw制作网站模板
  • 自适应网站欣赏医联体网站建设
  • 南安市住房和城乡建设部网站微商城网站建设行情
  • 网站开发的前景wordpress倒闭
  • 合肥网站建设网页设计免费推广渠道有哪些方式
  • 广州电力建设有限公司网站按月网站建设
  • 做网站客户会问什么问题手机如何制作网页链接
  • 做足球直播网站wordpress筛选框
  • 做网站需求文档深圳站建在边境
  • 网站建设法规浙江建设信息港证书查询
  • 影视作品网站开发与设计网站建设教程简笔画
  • 自己可以给公司做网站吗网站建设 用ftp上传文件
  • 电子商务网站开发与管理网站建设的设备
  • 网站建设项目公司沈阳网站关键字优化
  • 可以做淘宝联盟的免费网站优质国外网站
  • 石家庄营销型网站建设公司服装公司网站源码
  • 网站开发的软硬件需求做网站盘锦
  • 创意网站建设排行榜python和php哪个做网站
  • 开锁做网站怎么样榆林网站开发公司
  • 松原市建设局网站苏州网站建设-中国互联
  • 标书制作教程视频网站福田祥菱v1单排
  • 点网站出图片怎么做能看人与动物做的网站