免费做cpa单页网站,外贸网站推广收费,百度代理查询,网站开发维护合同样板记录kafka-flink-kafka的end-to-end的exactly-once语义 步骤代码 步骤
开启checkpoint、stateBackend的设置和checkpoint配置设置kafka source的配置读取kafka source message随意的transformation#xff1b;并打印结果kafka sink端的配置输出到kafka sink端执行
代码
pac… 记录kafka-flink-kafka的end-to-end的exactly-once语义 步骤代码 步骤
开启checkpoint、stateBackend的设置和checkpoint配置设置kafka source的配置读取kafka source message随意的transformation并打印结果kafka sink端的配置输出到kafka sink端执行
代码
package com.javaye.demo.exactly;import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;/*** Author: Java页大数据* Date: 2024-04-11:17:59* Describe:* kafka - flink - kafka 验证end-to-end的exactly once*/
public class ExactlyOnce {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 1.1. 开启checkpoint间隔为1000L msenv.enableCheckpointing(1000L);// 1.2. stateBackend:checkpoint持久化目录if (SystemUtils.IS_OS_WINDOWS) {env.setStateBackend(new FsStateBackend(file:///D:/ckp));} else {env.setStateBackend(new FsStateBackend(hdfs://only:9870/flink-checkpoints));}CheckpointConfig config env.getCheckpointConfig();
// 1.3. ckp的配置
// 1.3.1. 前后两次checkpoint的最小间隔:防止前后两次的checkpoint重叠config.setMinPauseBetweenCheckpoints(500L);
// 1.3.2. 容忍5次checkpoint失败config.setTolerableCheckpointFailureNumber(5);
// 1.3.3. job被取消时保留外部的checkpointconfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 1.3.4. 设置checkpoint的语义为 exactly-onceconfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 1.3.5. 设置checkpoint的超时时间若checkpoint超过该超时时间则说明该次checkpoint失败丢弃该checkpointconfig.setCheckpointTimeout(60 * 1000);
// 1.3.6. 设置同一时刻允许多少个checkpoint同时执行config.setMaxConcurrentCheckpoints(1);// 1.4. 设置重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));// 2. 设置kafka source的配置String kafkaServer only:9092;String sourceTopic flink_kafka_source;String groupId flink_kafka_source_exactly_once;String clientIdPrefix flink_exactly_once;Properties kafkaSourceProp new Properties();KafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(kafkaServer).setTopics(sourceTopic).setGroupId(groupId).setClientIdPrefix(clientIdPrefix).setStartingOffsets(OffsetsInitializer.latest()) // Start from latest offset.setProperty(partition.discovery.interval.ms, 50000) // discover new partitions per 50 seconds.setProperty(auto.offset.reset, latest).setValueOnlyDeserializer(new SimpleStringSchema())
// 执行checkpoint时提交offset到checkpointflink内部使用并且提交一份到默认主题__consumer_offsets
// .setCommitOffsetsOnCheckpoints(true) // checkpoint开启默认为true否则为false不支持该方法.setProperties(kafkaSourceProp).build();// 3. 读取kafka source messageDataStreamSourceString kafkaDS env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), flink_kafka_exactly_once, TypeInformation.of(String.class));// 4. 随意的transformationSingleOutputStreamOperatorString flatMapDS kafkaDS.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String value, CollectorString out) throws Exception {String[] words value.split(,);for (String word : words) {Random random new Random();int i random.nextInt(5);if (i 3) {System.out.println(模拟出现bug...);throw new RuntimeException(模拟出现bug...);}System.out.println(word i);out.collect(word i);}}});// 4.1. 打印结果容易观察flatMapDS.print();// 5. kafka sink端的配置Properties kafkaSinkProp new Properties();kafkaSinkProp.setProperty(transaction.timeout.ms, 1000 * 5 ); //设置事务超时时间也可在kafka配置中设置KafkaSinkString kafkaSink KafkaSink.Stringbuilder().setBootstrapServers(kafkaServer).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(flink_kafka_sink).setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setKafkaProducerConfig(kafkaSinkProp).build();// 6. 输出到kafka sink端flatMapDS.sinkTo(kafkaSink);// 7. 执行env.execute(ExactlyOnce.class.getName());}
}