当前位置: 首页 > news >正文

免费做cpa单页网站外贸网站推广收费

免费做cpa单页网站,外贸网站推广收费,百度代理查询,网站开发维护合同样板记录kafka-flink-kafka的end-to-end的exactly-once语义 步骤代码 步骤 开启checkpoint、stateBackend的设置和checkpoint配置设置kafka source的配置读取kafka source message随意的transformation#xff1b;并打印结果kafka sink端的配置输出到kafka sink端执行 代码 pac… 记录kafka-flink-kafka的end-to-end的exactly-once语义 步骤代码 步骤 开启checkpoint、stateBackend的设置和checkpoint配置设置kafka source的配置读取kafka source message随意的transformation并打印结果kafka sink端的配置输出到kafka sink端执行 代码 package com.javaye.demo.exactly;import org.apache.commons.lang3.SystemUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;import java.util.Properties; import java.util.Random; import java.util.concurrent.TimeUnit;/*** Author: Java页大数据* Date: 2024-04-11:17:59* Describe:* kafka - flink - kafka 验证end-to-end的exactly once*/ public class ExactlyOnce {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 1.1. 开启checkpoint间隔为1000L msenv.enableCheckpointing(1000L);// 1.2. stateBackend:checkpoint持久化目录if (SystemUtils.IS_OS_WINDOWS) {env.setStateBackend(new FsStateBackend(file:///D:/ckp));} else {env.setStateBackend(new FsStateBackend(hdfs://only:9870/flink-checkpoints));}CheckpointConfig config env.getCheckpointConfig(); // 1.3. ckp的配置 // 1.3.1. 前后两次checkpoint的最小间隔:防止前后两次的checkpoint重叠config.setMinPauseBetweenCheckpoints(500L); // 1.3.2. 容忍5次checkpoint失败config.setTolerableCheckpointFailureNumber(5); // 1.3.3. job被取消时保留外部的checkpointconfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 1.3.4. 设置checkpoint的语义为 exactly-onceconfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 1.3.5. 设置checkpoint的超时时间若checkpoint超过该超时时间则说明该次checkpoint失败丢弃该checkpointconfig.setCheckpointTimeout(60 * 1000); // 1.3.6. 设置同一时刻允许多少个checkpoint同时执行config.setMaxConcurrentCheckpoints(1);// 1.4. 设置重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));// 2. 设置kafka source的配置String kafkaServer only:9092;String sourceTopic flink_kafka_source;String groupId flink_kafka_source_exactly_once;String clientIdPrefix flink_exactly_once;Properties kafkaSourceProp new Properties();KafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(kafkaServer).setTopics(sourceTopic).setGroupId(groupId).setClientIdPrefix(clientIdPrefix).setStartingOffsets(OffsetsInitializer.latest()) // Start from latest offset.setProperty(partition.discovery.interval.ms, 50000) // discover new partitions per 50 seconds.setProperty(auto.offset.reset, latest).setValueOnlyDeserializer(new SimpleStringSchema()) // 执行checkpoint时提交offset到checkpointflink内部使用并且提交一份到默认主题__consumer_offsets // .setCommitOffsetsOnCheckpoints(true) // checkpoint开启默认为true否则为false不支持该方法.setProperties(kafkaSourceProp).build();// 3. 读取kafka source messageDataStreamSourceString kafkaDS env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), flink_kafka_exactly_once, TypeInformation.of(String.class));// 4. 随意的transformationSingleOutputStreamOperatorString flatMapDS kafkaDS.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String value, CollectorString out) throws Exception {String[] words value.split(,);for (String word : words) {Random random new Random();int i random.nextInt(5);if (i 3) {System.out.println(模拟出现bug...);throw new RuntimeException(模拟出现bug...);}System.out.println(word i);out.collect(word i);}}});// 4.1. 打印结果容易观察flatMapDS.print();// 5. kafka sink端的配置Properties kafkaSinkProp new Properties();kafkaSinkProp.setProperty(transaction.timeout.ms, 1000 * 5 ); //设置事务超时时间也可在kafka配置中设置KafkaSinkString kafkaSink KafkaSink.Stringbuilder().setBootstrapServers(kafkaServer).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(flink_kafka_sink).setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setKafkaProducerConfig(kafkaSinkProp).build();// 6. 输出到kafka sink端flatMapDS.sinkTo(kafkaSink);// 7. 执行env.execute(ExactlyOnce.class.getName());} }
http://www.zqtcl.cn/news/129959/

相关文章:

  • 淘宝网站模板是什么做的北海建设厅网站
  • 我想建个网站想做电商应该怎么入门
  • 广州番禺网站制作推广网站建设徐州百度网络网站
  • 有没有个人做网站的新网站做seo 的效果
  • 做网站和app哪个简单旅游资讯网站建设方案
  • 网站建设考级百度怎样可以搜到自己的网站
  • 自助建站免费自助建站网站广州企业网站
  • 常德论坛尚一网唐山seo网络推广
  • 网站建设预付流程网站设计风格的关键词
  • 常德网站制作怎么做自己的网页
  • 做的网站为什么图片看不了wordpress循环该分类子分类
  • 源码出售网站怎么做一个产品的网络营销方案
  • 安丘营销型网站建设国外教育网站模板
  • 做网站案例百度小说排行榜前十
  • 东昌网站建设公司上传到网站去的文档乱码
  • 如何制作自己的网站链接教程网络营销seo招聘
  • 网站制作资料收集wordpress资源网模板
  • 随州网站设计开发服务做网站制作步骤
  • 东莞凤岗做网站黄山旅游攻略住宿
  • 网站开发常用插件免费库存管理软件哪个好
  • 河池网站开发工程师招聘网如何做品牌运营与推广
  • 做网站运营难吗零基础网站建设教程
  • 深圳蚂蚁网络网站建设wordpress电影主题
  • 网站域名收费吗搜索引擎不收录网站
  • 海兴网站建设价格wordpress替代软件
  • 做网站哪家服务器好小区物业管理系统
  • 上海推广网站公司网站建设首选
  • 网站建设行业分析报告网站建设视频教程
  • 服装网站建设图企业网站建设开题报告是什么
  • 建设外贸商城网站制作网站建设的中期目标