当前位置: 首页 > news >正文

官方微网站威远移动网站建设

官方微网站,威远移动网站建设,在线网站设计,企业网站模块种类星光下的赶路人star的个人主页 世间真正温煦的春色#xff0c;都熨帖着大地#xff0c;潜伏在深谷 文章目录 1、输出算子#xff08;Sink#xff09;1.1 连接到外部系统1.2 输出到文件1.3 输出到Kafka1.4 输出到MySQL#xff08;JDBC#xff09;1.4 自定义Sink输出 1、输…                        星光下的赶路人star的个人主页 世间真正温煦的春色都熨帖着大地潜伏在深谷 文章目录 1、输出算子Sink1.1 连接到外部系统1.2 输出到文件1.3 输出到Kafka1.4 输出到MySQLJDBC1.4 自定义Sink输出 1、输出算子Sink Flink作为数据处理框架最终还是要把计算处理的结果写入外部储存为外部应用提供支持。 1.1 连接到外部系统 Flink的DataStream API专门提供了向外部提供写入数据的方法addSink。与addSource类似addSink方法对应着一个“Sink”算子主要就是用来实现与外部系统连接、并将数据提交写入的。Flink程序中所有对外的输出操作一般都是利用Sink算子完成的。 Flink1.12以前Sink算子的创建是通过调用DataStream的.addSink()方法实现的。 stream.addSink(new SinkFunction(…));addSink方法同样需要传入一个参数实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke()用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。 Flink1.12开始同样重构了Sink架构 stream.sinkTo(…)当然Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示列出了Flink官方目前支持的第三方系统连接器 我们可以看到像Kafka之类流式系统Flink提供了完美对接source/sink两端都能连接可读可写而对于Elasticsearch、JDBC等数据存储系统则只提供了输出写入的sink连接器。 除Flink官方之外Apache Bahir框架也实现了一些其他第三方系统与Flink的连接器。 除此以外就需要用户自定义实现sink连接器了。 1.2 输出到文件 Flink专门提供了一个流式文件系统的连接器FileSink为批处理和流处理提供了一个统一的Sink它可以将分区文件写入Flink支持的文件系统。 FileSink支持行编码Row-encoded和批量编码Bulk-encoded格式。这两种不同的方式都有各自的构建器builder可以直接调用FileSink的静态方法 行编码 FileSink.forRowFormatbasePathrowEncoder。批量编码 FileSink.forBulkFormatbasePathbulkWriterFactory。 public class SinkFile {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 每个目录中都有 并行度个数的 文件在写入env.setParallelism(2);// 必须开启checkpoint否则一直都是 .inprogressenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);DataGeneratorSourceString dataGeneratorSource new DataGeneratorSource(new GeneratorFunctionLong, String() {Overridepublic String map(Long value) throws Exception {return Number: value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(1000),Types.STRING);DataStreamSourceString dataGen env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), data-generator);// 输出到文件系统FileSinkString fieSink FileSink// 输出行式存储的文件指定路径、指定编码.StringforRowFormat(new Path(f:/tmp), new SimpleStringEncoder(UTF-8))// 输出文件的一些配置 文件名的前缀、后缀.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix(atguigu-).withPartSuffix(.log).build())// 按照目录分桶如下就是每个小时一个目录.withBucketAssigner(new DateTimeBucketAssigner(yyyy-MM-dd HH, ZoneId.systemDefault()))// 文件滚动策略: 1分钟 或 1m.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(1)).withMaxPartSize(new MemorySize(1024*1024)).build()).build();dataGen.sinkTo(fieSink);env.execute();} }1.3 输出到Kafka 1添加Kafka 连接器依赖 由于我们已经测试过从Kafka数据源读取数据连接器相关依赖已经引入这里就不重复介绍了。 2启动Kafka集群 3编写输出到Kafka的示例代码 输出无key的record: public class SinkKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 如果是精准一次必须开启checkpoint后续章节介绍env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperatorString sensorDS env.socketTextStream(hadoop102, 7777);/*** Kafka Sink:* TODO 注意如果要使用 精准一次 写入Kafka需要满足以下条件缺一不可* 1、开启checkpoint后续介绍* 2、设置事务前缀* 3、设置事务超时时间 checkpoint间隔 事务超时时间 max的15分钟*/KafkaSinkString kafkaSink KafkaSink.Stringbuilder()// 指定 kafka 的地址和端口.setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092)// 指定序列化器指定Topic名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.Stringbuilder().setTopic(ws).setValueSerializationSchema(new SimpleStringSchema()).build())// 写到kafka的一致性级别 精准一次、至少一次.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 如果是精准一次必须设置 事务的前缀.setTransactionalIdPrefix(atguigu-)// 如果是精准一次必须设置 事务超时时间: 大于checkpoint间隔小于 max 15分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000).build();sensorDS.sinkTo(kafkaSink);env.execute();} }自定义序列化器实现带key的record: public class SinkKafkaWithKey {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);env.setRestartStrategy(RestartStrategies.noRestart());SingleOutputStreamOperatorString sensorDS env.socketTextStream(hadoop102, 7777);/*** 如果要指定写入kafka的key可以自定义序列化器* 1、实现 一个接口重写 序列化 方法* 2、指定key转成 字节数组* 3、指定value转成 字节数组* 4、返回一个 ProducerRecord对象把key、value放进去*/KafkaSinkString kafkaSink KafkaSink.Stringbuilder().setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092).setRecordSerializer(new KafkaRecordSerializationSchemaString() {NullableOverridepublic ProducerRecordbyte[], byte[] serialize(String element, KafkaSinkContext context, Long timestamp) {String[] datas element.split(,);byte[] key datas[0].getBytes(StandardCharsets.UTF_8);byte[] value element.getBytes(StandardCharsets.UTF_8);return new ProducerRecord(ws, key, value);}}).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix(atguigu-).setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 ).build();sensorDS.sinkTo(kafkaSink);env.execute();} }运行代码在Linux主机启动一个消费者查看是否收到数据 bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ws1.4 输出到MySQLJDBC 1添加依赖 !--mysql驱动 -- dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.27/version /dependency dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactIdversion3.1.0-1.17/version /dependency2启动MySQL在test库下建表 CREATE TABLE ws (id varchar(100) NOT NULL,ts bigint(20) DEFAULT NULL,vc int(11) DEFAULT NULL,PRIMARY KEY (id) ) ENGINEInnoDB DEFAULT CHARSETutf83输出到MySQL的示例代码 public class SinkMySQL {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction());/*** TODO 写入mysql* 1、只能用老的sink写法 addsink* 2、JDBCSink的4个参数:* 第一个参数 执行的sql一般就是 insert into* 第二个参数 预编译sql 对占位符填充值* 第三个参数 执行选项 ---》 攒批、重试* 第四个参数 连接选项 ---》 url、用户名、密码*/SinkFunctionWaterSensor jdbcSink JdbcSink.sink(insert into ws values(?,?,?),new JdbcStatementBuilderWaterSensor() {Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {//每收到一条WaterSensor如何去填充占位符preparedStatement.setString(1, waterSensor.getId());preparedStatement.setLong(2, waterSensor.getTs());preparedStatement.setInt(3, waterSensor.getVc());}},JdbcExecutionOptions.builder().withMaxRetries(3) // 重试次数.withBatchSize(100) // 批次的大小条数.withBatchIntervalMs(3000) // 批次的时间.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(jdbc:mysql://hadoop102:3306/test?serverTimezoneAsia/ShanghaiuseUnicodetruecharacterEncodingUTF-8).withUsername(root).withPassword(000000).withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build());sensorDS.addSink(jdbcSink);env.execute();} }4运行代码用客户端连接MySQL查看是否成功写入数据。 1.4 自定义Sink输出 如果我们想将数据存储到我们自己的存储设备中而Flink并没有提供可以直接使用的连接器就只能自定义Sink进行输出了。与Source类似Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类只要实现它通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。 stream.addSink(new MySinkFunctionString());在实现SinkFunction的时候需要重写的一个关键方法invoke()在这个方法中我们就可以实现将流里的数据发送出去的逻辑。 这种方式比较通用对于任何外部存储系统都有效不过自定义Sink想要实现状态一致性并不容易所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现而且在不断地扩充因此自定义的场景并不常见。                       您的支持是我创作的无限动力 希望我能为您的未来尽绵薄之力 如有错误谢谢指正若有收获谢谢赞美
http://www.zqtcl.cn/news/634726/

相关文章:

  • 做职业资格考试的网站有哪些网页游戏排行榜2024前十名
  • 网站设计方案怎么写wordpress仿站软件
  • 汕头建站模板系统北京有哪些电商平台公司
  • 深圳网站建设zhaoseo小包工头接活的平台
  • 电商平面设计前景如何seo推广什么意思
  • 网站解析不了wordpress 密码失败
  • 临沂企业建站系统模板扮家家室内设计
  • 做简单网站用什么软件网站开发国外研究现状
  • 江苏seo推广网站建设湖南软件定制开发
  • 台州商务网站手机端seo
  • 网站的切换语言都是怎么做的有哪些开发网站公司
  • 上海人才中心网站湖州建设公司网站
  • 网站的前台后台网站建设公司新报
  • 菜鸟式网站建设图书深圳建站公司好坏
  • 品牌网站建设熊掌号一级消防工程师考试通过率多少
  • 网站建设淘宝客模板湖口网站建设
  • 拱墅区建设局网站做设计的搜素材上什么网站
  • 济南烨铭网站建设外贸建网站免费模板
  • 那些网站可以做反链浏览器网站大全
  • 泉州网站建设推广企业网页兼容性站点
  • 怎样做视频上网站赚钱推广计划怎么做推广是什么
  • 台州外贸网站建设做网站开发一般用什么语言
  • 咸阳做网站的公司漯河网做网站
  • 红酒网站模板下载做网站加推广
  • 免费网站服务器域名在线手机网站建设
  • 北京网站ui设计公司在线设计装修
  • 大学生网站作业北京网站优化技术
  • 静安区网站开发固原网络推广
  • WordPress网站修改志成网站设计制作
  • 做网站需要注意的昭通网站seo优化