asp.net 网站安装包,网站搜索引擎拓客,成都微信小程序制作价格,扁平化购物网站模板1、说明
使用flink实时的读取kafka的数据#xff0c;并且实时的存储到iceberg中。好处是可以一边存数据#xff0c;一边查询数据。当然使用clickhouse也可以实现数据的既存既取。而hive数据既存既读则会有问题。iceberg中数据读写数据都是从快照中开始的#xff0c;读和写对…1、说明
使用flink实时的读取kafka的数据并且实时的存储到iceberg中。好处是可以一边存数据一边查询数据。当然使用clickhouse也可以实现数据的既存既取。而hive数据既存既读则会有问题。iceberg中数据读写数据都是从快照中开始的读和写对应的不同快照所以读写互不影响。而hive中写的时候数据就不能读。
下面是使用flink读取kafka数据存储到iceberg的例子。本案例可以直接在本地直接运行无需搭建hadoophive集群。其中遇到的问题及解决思路。用到kafka可以直接使用docker来搞一个跑起来。
2、实现步骤
1确保flink和iceberg的版本对应
这里使用的是flink1.13.5iceberg: 0.12.1
2) 创建流式执行环境 使用getExecutionEnvironment的静态方法可以自动识别是本地环境还是集群服务环境。当然也可以使用createLocalEnvironment()方法创建本地环境。 该环境变量类似于上下文可以配置一些基本的信息如并行度默认是CPU数、检查时间间隔默认不检查。 这里设置检查为5000毫秒到检查时间的时候 ,Flink向Iceberg中写入数据时当checkpoint发生后才会commit数据必须设置checkpoint
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);3指定kafka数据源创建数据流 设置读取kafka的一些参数值的序列化使用的是字符串序列化模式SimpleStringSchema开始读取kafka数据偏移量设置则从最新的数据开始读取latest。 从指定的source源中创建一个数据流。这里的 source 可能是Kafka读取数据,当然还可以从其他外部源如socket、Kinesis 或其他数据源读取的数据。WatermarkStrategy.noWatermarks(),这是一个水印策略。用于处理事件时间event-time和处理有延迟或乱序的数据。WatermarkStrategy.noWatermarks() 表示我们不想为这个数据流生成任何水印。这意味着我们可能是在做纯粹的基于处理时间processing-time的流处理或者我们不关心事件时间的顺序。kafka_source,这是给这个数据源分配的名称主要用于日志记录和调试。 KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(node1:9094,node2:9094).setTopics(topic_users).setGroupId(flink-test-1).setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest()).build();DataStreamSourceString kafkaDs env.fromSource(source, WatermarkStrategy.noWatermarks(), kafka_source);4流式数据转换器
上一步获取到的数据流将数据流中的json字符串转为RowData . 当然输入的数据格式可能是其它格式根据实际情况进行修改。注意GenericRowData中字符串格式必须使用 StringData.fromString(session.userId)来转换一下否则无法存储到iceberg表中。 SingleOutputStreamOperatorRowData dataStream kafkaDs.map((MapFunctionString, RowData) value - {Gson gson new Gson();Sessions session gson.fromJson(value, Sessions.class);GenericRowData row new GenericRowData(9);row.setField(0, session.version);row.setField(1, StringData.fromString(session.userId));row.setField(2, StringData.fromString(session.appType));row.setField(3, session.loginTime);row.setField(4, StringData.fromString(session.clientIp));row.setField(5, StringData.fromString(session.service));row.setField(6, session.status);row.setField(7, StringData.fromString(session.channel));row.setField(8, StringData.fromString(timeStamp2DateStr(session.loginTime)));return row;});5创建表创建Catalog、表Id表Schema、表分区、表文件存储格式等
这里内容看起来比较多但也就是一件事建表。 创建 Hadoop 配置: 加载系统默认配置core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml 创建 Iceberg 的 Hadoop Catalog: 用来管理表创建、删除以及元数据存储位置。这里使用hadoop存储元数据和实际数据。 指定表标识符: 对应数据库名称和表的名称。对应的数据也会存在iceberg_db/flink_iceberg_tbl目录下。 定义表的 Schema: 这部分代码定义了表的结构指定了每个字段的名称和类型。 定义分区: PartitionSpec.builderFor(schema).identity(hour_p).build();按照hour_p字段来分区数据. 如果不见分许则使用 PartitionSpec.unpartitioned()
PartitionSpec spec PartitionSpec.builderFor(schema).identity(hour_p).build();设置表属性: 设置了默认的文件格式为PARQUET。 检查表是否存在如果不存在则创建 创建 TableLoader: TableLoader是用于加载Iceberg表的工具这个目录是本地的数据存储的目录以及数据库、数据表对应的名称。如果使用hdfs则改为相应目录结构即可。
TableLoader tableLoader TableLoader.fromHadoopTable(data/flinkwarehouse/iceberg_db/flink_iceberg_tbl);// 创建默认的配置类会自动加载hadoop相关的配置文件Configuration hadoopConf new Configuration();// 设置Catalog的存储位置Catalog catalog new HadoopCatalog(hadoopConf, data/flinkwarehouse);// iceberg 数据库名称数据表名称TableIdentifier name TableIdentifier.of(iceberg_db, flink_iceberg_tbl);// 数据表明模式以及字段名称字段类型Schema schema new Schema(Types.NestedField.required(1, version, Types.IntegerType.get()),Types.NestedField.required(2, userId, Types.StringType.get()),Types.NestedField.required(3, appType, Types.StringType.get()),Types.NestedField.required(4, loginTime, Types.LongType.get()),Types.NestedField.required(5, clientIp, Types.StringType.get()),Types.NestedField.required(6, service, Types.StringType.get()),Types.NestedField.required(7, status, Types.IntegerType.get()),Types.NestedField.required(8, channel, Types.StringType.get()),Types.NestedField.required(9, hour_p, Types.StringType.get()));// 设置分区 PartitionSpec spec PartitionSpec.unpartitioned();PartitionSpec spec PartitionSpec.builderFor(schema).identity(hour_p).build();// 设置 默认文件存储格式 parquet格式MapString, String props ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());Table table null;if (!catalog.tableExists(name)) {table catalog.createTable(name, schema, spec, props);} else {table catalog.loadTable(name);}TableLoader tableLoader TableLoader.fromHadoopTable(data/flinkwarehouse/iceberg_db/flink_iceberg_tbl);
6创建FlinkSink读取流式数据存储到表中
接收一个数据流指定相应的表数据写入加载器中
7执行环境开始执行任务
这里给任务一个简短的描述。 env.execute(iceberg api and flink );3、全部代码如下
1build.gradle依赖如下配置
def flink [version: 1.13.5
]
def hadoop [version: 3.2.2
]def iceberg [version: 0.12.1
]dependencies {implementation com.alibaba.ververica:ververica-connector-iceberg:1.13-vvr-4.0.7implementation org.apache.iceberg:iceberg-flink-runtime:${iceberg.version}implementation org.apache.flink:flink-java:${flink.version}implementation org.apache.flink:flink-streaming-java_2.11:${flink.version}implementation org.apache.flink:flink-clients_2.11:${flink.version}implementation org.apache.flink:flink-streaming-scala_2.11:${flink.version}implementation org.apache.flink:flink-connector-kafka_2.11:${flink.version}implementation org.apache.flink:flink-connector-base:${flink.version}implementation org.apache.hadoop:hadoop-client:${hadoop.version}implementation org.apache.flink:flink-table-runtime-blink_2.11:${flink.version}implementation org.apache.flink:flink-table:${flink.version}implementation org.apache.flink:flink-table-common:${flink.version}implementation org.apache.flink:flink-table-api-java:${flink.version}implementation org.apache.flink:flink-table-api-java-bridge_2.11:${flink.version}implementation org.apache.flink:flink-table-planner_2.11:${flink.version}implementation org.apache.flink:flink-table-planner-blink_2.11:${flink.version}testImplementation junit:junit:4.11// log4j and slf4j dependencies testImplementation org.slf4j:slf4j-log4j12:1.7.25testImplementation log4j:log4j:1.2.17implementation org.slf4j:slf4j-api:1.7.25testImplementation org.slf4j:slf4j-nop:1.7.25testImplementation org.slf4j:slf4j-simple:1.7.5implementation com.google.code.gson:gson:2.3.1// 没有这个会出现报错使用compilecompile com.google.guava:guava:28.2-jre}2案例代码
package com.subao.flink;import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Types;import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Map;public class FlinkIceberg {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// Flink向Iceberg中写入数据时当checkpoint发生后才会commit数据必须设置checkpointenv.enableCheckpointing(5000);KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(124.70.194.33:9094,124.71.180.217:9094).setTopics(sessions).setGroupId(flink-test-1).setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest()).build();DataStreamSourceString kafkaDs env.fromSource(source, WatermarkStrategy.noWatermarks(), kafka_source);SingleOutputStreamOperatorRowData dataStream kafkaDs.map((MapFunctionString, RowData) value - {Gson gson new Gson();Sessions session gson.fromJson(value, Sessions.class);GenericRowData row new GenericRowData(9);row.setField(0, session.version);row.setField(1, StringData.fromString(session.userId));row.setField(2, StringData.fromString(session.appType));row.setField(3, session.loginTime);row.setField(4, StringData.fromString(session.clientIp));row.setField(5, StringData.fromString(session.service));row.setField(6, session.status);row.setField(7, StringData.fromString(session.channel));row.setField(8, StringData.fromString(timeStamp2DateStr(session.loginTime)));System.out.println(row);return row;});// 创建默认的配置类会自动加载hadoop相关的配置文件Configuration hadoopConf new Configuration();// 设置Catalog的存储位置Catalog catalog new HadoopCatalog(hadoopConf, data/flinkwarehouse);// iceberg 数据库名称数据表名称TableIdentifier name TableIdentifier.of(iceberg_db, flink_iceberg_tbl);// 数据表明模式以及字段名称字段类型Schema schema new Schema(Types.NestedField.required(1, version, Types.IntegerType.get()),Types.NestedField.required(2, userId, Types.StringType.get()),Types.NestedField.required(3, appType, Types.StringType.get()),Types.NestedField.required(4, loginTime, Types.LongType.get()),Types.NestedField.required(5, clientIp, Types.StringType.get()),Types.NestedField.required(6, service, Types.StringType.get()),Types.NestedField.required(7, status, Types.IntegerType.get()),Types.NestedField.required(8, channel, Types.StringType.get()),Types.NestedField.required(9, hour_p, Types.StringType.get()));// 设置分区 PartitionSpec spec PartitionSpec.unpartitioned();PartitionSpec spec PartitionSpec.builderFor(schema).identity(hour_p).build();// 设置 默认文件存储格式 parquet格式MapString, String props ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());Table table null;if (!catalog.tableExists(name)) {table catalog.createTable(name, schema, spec, props);} else {table catalog.loadTable(name);}TableLoader tableLoader TableLoader.fromHadoopTable(data/flinkwarehouse/iceberg_db/flink_iceberg_tbl);FlinkSink.forRowData(dataStream).table(table).tableLoader(tableLoader).overwrite(false).build();env.execute(iceberg api and flink );}private static String timeStamp2DateStr(long timestamp) {// 将时间戳转为LocalDateTime对象LocalDateTime dateTime LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp * 1000), ZoneId.systemDefault());// 定义日期时间格式DateTimeFormatter formatter DateTimeFormatter.ofPattern(yyyyMMddHH);// 格式化日期时间return dateTime.format(formatter);}class Sessions {private int version;private String userId;private String appType;private long loginTime;private String clientIp;private String service;private int status;private String channel;public Sessions(int version, String userId, String appType, long loginTime, String clientIp, String service, int status, String channel) {this.version version;this.userId userId;this.appType appType;this.loginTime loginTime;this.clientIp clientIp;this.service service;this.status status;this.channel channel;}public int getVersion() {return version;}public void setVersion(int version) {this.version version;}public String getUserId() {return userId;}public void setUserId(String userId) {this.userId userId;}public String getAppType() {return appType;}public void setAppType(String appType) {this.appType appType;}public long getLoginTime() {return loginTime;}public void setLoginTime(long loginTime) {this.loginTime loginTime;}public String getClientIp() {return clientIp;}public void setClientIp(String clientIp) {this.clientIp clientIp;}public String getService() {return service;}public void setService(String service) {this.service service;}public int getStatus() {return status;}public void setStatus(int status) {this.status status;}public String getChannel() {return channel;}public void setChannel(String channel) {this.channel channel;}Overridepublic String toString() {return Sessions{ version version , userId userId \ , appType appType \ , loginTime loginTime , clientIp clientIp \ , service service \ , status status , channel channel \ };}}
}
4、遇到的问题
1找不到方法 java.lang.NoSuchMethodError:com.google.common.base.Preconditions.checkState。
Exception in thread main java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkState(ZLjava/lang/String;I)V这个主要是相应的com.google.guava的版本不兼容在build.gradle中加入如下即可
compile com.google.guava:guava:28.2-jre排查思路
a) 看到这个我就想应该是相应的包没有正常添加到依赖中。
可以百度一下看看是哪个包,发现是com.google.guava包。
b) 检查现有依赖中是否包含该包
因为使用的是gradle所以使用gradle的命令gradle dependencies,从结果中查看,发现使用的hadoop中有多个包版本不同最大的是27.0的包。
c) 尝试使用高版本的guava的包
compile com.google.guava:guava:28.2-jre经测试是可以的。但这里主要要使用compile关键字。
compile和implementation都表示依赖文件在引入项目的编译期、运行期和测试期使用。但是compile标识的包可被其它依赖包使用implementation则不可以。 compile 关键字会将依赖项传递到项目的所有模块和依赖项中。这意味着如果模块 A 依赖于模块 B并且模块 B 使用了compile 关键字引入了一些依赖项那么这些依赖项也会传递给模块 A。 implementation 关键字只将依赖项传递到当前模块中不会传递给依赖模块。这意味着如果模块 A 依赖于模块 B并且模块 B 使用了implementation 关键字引入了一些依赖项那么这些依赖项不会传递给模块 A。 由于compile 关键字会引入依赖的传递性可能导致不可预期的副作用和冲突。为了解决这个问题从 Gradle 3.4 版本开始建议使用implementation 关键字代替compile 关键字以减少依赖项传递引起的问题。 2) 数据文件为空
flink能够读取kafka的数据但是不能将数据写到文件中。怎么办于是问了问GPT它说看看日志。我没有配置log4j的日志配置当然看不到错误日志了。于是我就加上log4j.properties的配置文件。然后发现了如下错误
2023-08-08 19:38:16 DEBUG JobMaster:658 - Archive local failure causing attempt 8d8beb11aa845d873ebbc317b21bf435 to fail: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.flink.table.data.StringDataat org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169)at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$1(RowData.java:221)at org.apache.iceberg.flink.data.FlinkParquetWriters$RowDataWriter.get(FlinkParquetWriters.java:453)原来是数据格式不兼容的问题对于Flink中的RowData中使用的是StringData表示字符串因此在存入的时候需要把String类型转为StringData即可。如 row.setField(2, StringData.fromString(session.appType));总结flink读取kafak数据用户的应该是比较多的但是保存到iceberg中这个相对用的比较少。flink中比较常用flink-sql来处理和数据表相关的数据。