当前位置: 首页 > news >正文

重庆企业建站系统模板南安市住房和城乡建设部网站

重庆企业建站系统模板,南安市住房和城乡建设部网站,wordpress登录界面图标,网页程序开发采购前言 今天是我写博客的第 200 篇#xff0c;恍惚间两年过去了#xff0c;现在已经是大三的学长了。仍然记得两年前第一次写博客的时候#xff0c;当时学的应该是 Java 语言#xff0c;菜的一批#xff0c;写了就删#xff0c;怕被人看到丢脸。当时就想着自己一年之后恍惚间两年过去了现在已经是大三的学长了。仍然记得两年前第一次写博客的时候当时学的应该是 Java 语言菜的一批写了就删怕被人看到丢脸。当时就想着自己一年之后两年之后能学到什么水平什么是 JDBC、什么是 MVC、SSM在当时都是特别好奇的东西不过都在后来的学习中慢慢接触到并且好多已经烂熟于心了。 那今天我在畅想一下一年后的今天我又学到了什么水平能否达到三花聚顶、草木山石皆可为码的超凡入圣的境界拿没拿到心仪的 offer和那个心动过的女孩相处怎么样了哈哈哈哈哈 输出算子Sink 学完了 Flink 在不同执行环境本地测试环境和集群环境下的多种读取多种数据源和转换操作多种转换算子最后就是输出操作了。 1、连接到外部系统 Flink 1.12 之前Sink 算子是通过调用 DataStream 的 addSink 方法来实现的 stream.addSink(new SinkFunction(...)); 从 Flink 1.12 开始Flink 重构了 Sink 架构 stream.sinkTo(...) 查看 Flink 支持的连接器 需要我们自己导入依赖比如上面的 Kfaka 和 DataGen 我们之前使用的时候都导入过相关依赖需要知道有的是只支持source有的只支持sink有的全都支持。 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-datagen/artifactIdversion${flink.version}/version/dependency 2、输出到文件 Flink 专门提供了一个流式文件系统的连接器FileSink为批处理和流处理提供了一个统一的 Sink它可以将分区文件写入 Flink支持的文件系统。         它的主要操作是将数据写入桶buckets每个桶中的数据都可以分割成一个个大小有限的分区文件这样一来就实现真正意义上的分布式文件存储。我们可以通过各种配置来控制“分桶”的操作默认的分桶方式是基于时间的我们每小时写入一个新的桶。换句话说每个桶内保存的文件记录的都是 1 小时的输出数据。         FileSink 支持行编码Row-encoded和批量编码Bulk-encoded比如 Parquet格式。这两种不同的方式都有各自的构建器builder调用方法也非常简单可以直接调用 FileSink 的静态方法 行编码FileSink.forRowFormatbasePathrowEncoder。批量编码FileSink.forBulkFormatbasePathbulkWriterFactory。 在创建行或批量编码 Sink 时我们需要传入两个参数用来指定存储桶的基本路径basePath和数据的编码逻辑rowEncoder 或 bulkWriterFactory。 package com.lyh.sink;import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.configuration.MemorySize; import org.apache.flink.connector.datagen.source.DataGeneratorSource; import org.apache.flink.connector.datagen.source.GeneratorFunction; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;import java.time.Duration; import java.time.ZoneId;/*** author 刘xx* version 1.0* date 2023-11-18 9:51*/ public class SinkFile {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 必须开启 检查点 不然一直都是 .inprogressenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);DataGeneratorSourceString dataGeneratorSource new DataGeneratorSourceString(new GeneratorFunctionLong, String() {Overridepublic String map(Long value) throws Exception {return Number:value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(10), // 每s 10条Types.STRING);DataStreamSourceString dataGen env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), data-generate);// todo 输出到文件系统FileSinkString fileSink FileSink.// 泛型方法 需要和输出结果的泛型保持一致StringforRowFormat(new Path(D:/Desktop), // 指定输出路径 可以是 hdfs:// 路径new SimpleStringEncoder(UTF-8)) // 指定编码.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix(lyh).withPartSuffix(.log).build())// 按照目录分桶 一个小时一个目录(这里的时间格式别改为分钟 会报错: flink Relative path in absolute URI:).withBucketAssigner(new DateTimeBucketAssigner(yyyy-MM-dd HH, ZoneId.systemDefault()))// 设置文件滚动策略-时间或者大小 10s 或 1KB 或 5min内没有新数据写入 滚动一次// 滚动的时候 文件就会更名为我们设定的格式(前缀)不再写入.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofSeconds(10L)) // 10s.withMaxPartSize(new MemorySize(1024)) // 1KB.withInactivityInterval(Duration.ofMinutes(5)) // 5min.build()).build();dataGen.sinkTo(fileSink);env.execute();} }这里我们创建了一个简单的文件 Sink通过.withRollingPolicy()方法指定了一个“滚动策略”。“滚动”的概念在日志文件的写入中经常遇到因为文件会有内容持续不断地写入所以我们应该给一个标准到什么时候就开启新的文件将之前的内容归档保存。也就是说上面的代码设置了在以下 3 种情况下我们就会滚动分区文件 ⚫ 至少包含 10 秒的数据 ⚫ 最近 5 分钟没有收到新的数据 ⚫ 文件大小已达到 1 KB 通过 withOutputFileConfig()方法指定了输出的文件名前缀和后缀。 需要特别注意的就是一定要开启检查点否则我们的数据一直都是正在写入的状态具体原因后面学习到检查点的时候会详细说。 运行结果 3、输出到 Kafka 需要添加 Kafka 依赖之前导入过了启动 Kafka编写示例代码 package com.lyh.sink;import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.kafka.clients.producer.ProducerConfig;/*** author 刘xx* version 1.0* date 2023-11-18 11:20*/ public class SinkKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 如果是 精准一次 必须开启 checkpointenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperatorString sensorDS env.socketTextStream(localhost, 9999);KafkaSinkString kafkaSink KafkaSink.Stringbuilder()// 指定 kafka 的地址和端口.setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092)// 指定序列化器 我们是发送方 所以我们是生产者.setRecordSerializer(KafkaRecordSerializationSchema.Stringbuilder().setTopic(like).setValueSerializationSchema(new SimpleStringSchema()).build())// 写到 kafka 的一致性级别: 精准一次 / 至少一次// 如果是精准一次// 1.必须开启检查点 env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE)// 2.必须设置事务的前缀// 3.必须设置事务的超时时间: 大于 checkpoint间隔 小于 max 15分钟.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix(lyh-).setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,10*60*1000).build();sensorDS.sinkTo(kafkaSink);env.execute();} }启动 kafka 并开启一个消费者 kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic like运行结果 需要特别注意的三点 如果是精准一次1.必须开启检查点 env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE)2.必须设置事务的前缀3.必须设置事务的超时时间: 大于 checkpoint间隔 小于 max 15分钟 自定义序列化器 我们上面用的自带的序列化器但是如果我们有 key 的话就需要自定义序列化器了替换上面的代码 .setRecordSerializer(/*** 如果要指定写入 kafka 的key 就需要自定义序列化器* 实现一个接口 重写序列化方法* 指定key 转为 bytes[]* 指定value 转为 bytes[]* 返回一个 ProducerRecord(topic名,key,value)对象*/new KafkaRecordSerializationSchemaString() {NullableOverride// ProducerRecordbyte[], byte[] 返回一个生产者消息,key,value 分别对应两个字节数组public ProducerRecordbyte[], byte[] serialize(String element, KafkaSinkContext context, Long timestamp) {String[] datas element.split(,);byte[] key datas[0].getBytes(StandardCharsets.UTF_8);byte[] value element.getBytes(StandardCharsets.UTF_8);return new ProducerRecord(like,key,value);}} ) 运行结果  4、输出到 MySQL 添加依赖1.17版本的依赖需要指定仓库才能找到因为阿里云和默认的maven仓库是没有的 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactIdversion1.17-SNAPSHOT/version/dependency dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.31/version/dependency....repositoriesrepositoryidapache-snapshots/idnameapache snapshots/nameurlhttps://repository.apache.org/content/repositories/snapshots//url/repository/repositories 创建表格  编写代码将输入的数据行分隔为对象参数每行数据生成一个对象进行处理。  package com.lyh.sink;import com.lyh.bean.WaterSensor; import function.WaterSensorFunction; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.PreparedStatement; import java.sql.SQLException;/*** author 刘xx* version 1.0* date 2023-11-18 12:32*/ public class SinkMySQL {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(localhost, 9999).map(new WaterSensorFunction()); //输入进来的数据自动转为 WaterSensor类型/*** todo 写入 mysql* 1.这里需要用旧的sink写法addSink* 2.JDBC的4个参数* (1) 执行的sql语句* (2) 对占位符进行填充* (3) 执行选项 - 攒批,重试* (4) 连接选项 - driver,username,password,url*/SinkFunctionWaterSensor jdbcSink JdbcSink.sink(insert into flink.ws values(?,?,?),// 指定 sql 中占位符的值new JdbcStatementBuilderWaterSensor() {Overridepublic void accept(PreparedStatement stmt, WaterSensor sensor) throws SQLException {// 占位符从 1 开始stmt.setString(1, sensor.getId());stmt.setLong(2, sensor.getTs());stmt.setInt(3, sensor.getVc());}}, JdbcExecutionOptions.builder().withMaxRetries(3) //最多重试3次(不包括第一次,共4次).withBatchSize(100) //每收集100条记录进行一次写入.withBatchIntervalMs(3000) // 批次3s(即使没有达到100条记录,只要过了3s JDBCSink也会进行记录的写入),这有助于确保数据及时写入而不是无限期地等待批处理大小达到。.build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(jdbc:mysql://localhost:3306/flink?serverTimezoneAsia/ShanghaiuseUnicodetruecharacterEncodingUTF-8).withDriverName(com.mysql.cj.jdbc.Driver).withUsername(root).withPassword(Yan1029.)// mysql 默认8小时不使用连接就主动断开连接.withConnectionCheckTimeoutSeconds(60) // 重试连接直接的间隔,上面我们设置最多重试3次,每次间隔60s.build());sensorDS.addSink(jdbcSink);env.execute();} }查询结果 5、自定义 Sink 输出 与 Source 类似Flink 为我们提供了通用的 SinkFunction 接口和对应的 RichSinkDunction抽象类只要实现它通过简单地调用 DataStream 的.addSink()方法就可以自定义写入任何外部存储。 这里我们自定义实现一个向 HBase 中插入数据的 Sink。 注意这里只是做一个简单的 Demo下面的代码不难发现我们只是对 nosq:student 表下的 info:name 进行了两次的覆盖。如果要实现复杂的处理功能需要对数据类型进行定义因为 HBase 的数据是按列存储的所以对于复杂的 Hbase 表我们难以通过 Java bean 来插入数据。而且一般经常用的连接器Flink 大部分已经提供了开发中我们一般也很少自定义 Sink 输出。 package com.lyh.sink;import com.lyh.utils.HBaseConnection; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table;import java.nio.charset.StandardCharsets;/*** author 刘xx* version 1.0* date 2023-11-18 15:59*/ public class SinkCustomHBase {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.fromElements(tom,bob).addSink(new RichSinkFunctionString() {public Connection con;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);con HBaseConnection.getConnection(hadoop102:2181);}Overridepublic void invoke(String value, Context context) throws Exception {super.invoke(value, context);Table table con.getTable(TableName.valueOf(nosql,student));Put put new Put(1001.getBytes(StandardCharsets.UTF_8));put.addColumn(info.getBytes(StandardCharsets.UTF_8),name.getBytes(StandardCharsets.UTF_8),value.getBytes(StandardCharsets.UTF_8));table.put(put);table.close();}Overridepublic void close() throws Exception {super.close();HBaseConnection.close();}});env.execute();} }这里用到一个简单的连接 HBase 的工具类   package com.lyh.utils;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory;import java.io.IOException;/*** author 刘xx* version 1.0* date 2023-11-18 16:04*/ public class HBaseConnection {private static Connection connection;public static Connection getConnection(String hosts) throws IOException {Configuration conf new Configuration();conf.set(hbase.zookeeper.quorum, hosts);conf.setInt(hbase.rpc.timeout, 10000); // 设置最大超时 10 sconnection ConnectionFactory.createConnection(conf);return connection;}public static void close() throws IOException {if (connection!null)connection.close();} }
http://www.zqtcl.cn/news/707596/

相关文章:

  • 合肥网站搭建著名的网站建设公司
  • win7的iis怎么制作网站网页制作基础代码
  • 黄页网站大全免费网在线进一步优化供给推动消费平稳增长
  • dw中怎样做网站链接网页版qq登录入口账号密码
  • 外贸网站建设soho中国建设银行网站易方达消费
  • 淘宝客网站推广怎么做图文识别微信小程序是什么
  • 郑州网站建设、北京做网页公司
  • 代码错误网站wordpress主题屏蔽更新
  • 建五金方面的网站广告联盟app手机版
  • 宜宾建设网站公众号怎么制作流程
  • 上海崇明网站建设崇信县门户网站首页
  • 北京手机版建站系统开发学网页设计需要什么学历
  • 英文网站备案互联网排名前十的公司2021
  • 网站外部外链建设如何开发wordpress主题
  • 个人网站首页内容辽宁省建设网站
  • 二建证从住房建设厅网站调出流程需求分析 网站
  • 鞋子网站模板做网站开发学什么软件
  • 网站建设的需求客户中企动力科技股份有限公司招聘
  • 小程序定制 seo营销seo托管公司
  • 杭州网站设计公司联系亿企邦网站建设在电访销售话术
  • 安康网站开发公司报价网站开发人员考核
  • 谷歌网站 百度清苑住房和城乡建设局网站
  • 南宁世尊商贸网站建设如何查看一个网站是否备案
  • h5手机网站怎么做搜索引擎关键词怎么选
  • 弱电网站源码工程造价建设信息网站
  • 村级网站模板做公司永久免费网站什么好
  • 厦门做网站培训安康市电梯公司
  • 江苏水利建设网站排行榜百度
  • 营销导向的企业网站优化wordpress制作企业
  • 株洲网站建设公司wordpress资讯类主题破解版