做网站需要服务器吗,自学it从哪里学起,p2p网站建设报价2p排名,南宁seo排名原理流程流程:
Flink消费Kafka,逻辑处理后将实时流转换为表视图,利用HiveCataLog创建Hive表,将实时流 表insert进Hive,注意分区时间字段需要为 yyyy-MM-dd形式,否则抛出异常:java.time.format.DateTimeParseException: Text 20240111 could not be parsed 写入到hive分区表
strea…流程流程:
Flink消费Kafka,逻辑处理后将实时流转换为表视图,利用HiveCataLog创建Hive表,将实时流 表insert进Hive,注意分区时间字段需要为 yyyy-MM-dd形式,否则抛出异常:java.time.format.DateTimeParseException: Text 20240111 could not be parsed 写入到hive分区表
streamEnv需要开启checkpoint保证flink写入hive分区表的写入一致性hive表ddl中需要指定以下TBLPROPERTIES
sink.partition-commit.trigger分区提交触发器单选可选值为partition-time、process-time(默认), 其中partition-time需要根据当前数据的watermark来判断分区是否需要提交当watermark delay大于等于分区上的时间时就会提交该分区元数据process-time的话根据当前系统处理时间来判断分区是否需要提交当系统处理时间大于等于分区上的时间就会提交该分区元数据partition.time-extractor.timestamp-pattern使用partition-time触发器时使用该配置项。表示从表字段中提取出表达某个分区的时间的格式需要提取到的时间必须为yyyy-MM-dd HH:mm:ss的格式。比如字段dt的格式为yyyy-MM-dd则配置为$dt 00:00:00则表示分区时间取值为dt的value的0点0分0秒可以选择多个表字段组合。当表字段无法抽取出符合的格式时则使用自定义提取器partition.time-extractor.class。sink.partition-commit.delay: 表示watermark允许event time的最大乱序时间使用partition-time触发器时可以使用默认为0ssink.partition-commit.policy.kind分区提交方式多选可选值为metastore、success-file、custommetastore表示写入元数据库success-file表示往hdfs分区目录写入一个标志文件custom表示使用自定义提交方式通常使用metastore,success-file组合partition.time-extractor.kind当要使用自定义分区时间提取器时需要配置此项值配置为custompartition.time-extractor.class当要使用自定义分区时间提取器时需要配置此项值配置为自定义提取器的类路径。在集群中运行时需要把该类打成jar包放到flink lib目录下。某个分区触发提交后后续再有此分区的数据进来仍然会写入hive该分区。 写入到hive非分区表 val streamEnv ...
val dataStream ...
val streamTableEnv ...
streamTableEnv.createTemporaryView(自定义catalog表名, dataStream, *fields) # 当前flink存在bug转换时必须指定fields或者schema否则watermark无法流入table
val catalog ...
streamTableEnv.registerCatalog(hive, catalog)
streamTableEnv.useCatalog(hive)
streamTableEnv.executeSql(insert sql).print() 参考:
flink - sink - hive - 简书 (jianshu.com)
5-flinkSQL参数 (gitee.io) 详见官网: Catalogs | Apache Flink