快速网站建设公司哪家好,react 网站开发,公司部门职位,网站开发待遇好吗学习笔记 Flink作为数据处理框架#xff0c;最终还是要把计算处理的结果写入外部存储#xff0c;为外部应用提供支持。 文章目录 **连接到外部系统****输出到文件**输出到 Kafka输出到 mysql自定义 sink 连接到外部系统
Flink的DataStream API专门提供了向外部写入数据的方… 学习笔记 Flink作为数据处理框架最终还是要把计算处理的结果写入外部存储为外部应用提供支持。 文章目录 **连接到外部系统****输出到文件**输出到 Kafka输出到 mysql自定义 sink 连接到外部系统
Flink的DataStream API专门提供了向外部写入数据的方法addSink。与addSource类似addSink方法对应着一个“Sink”算子主要就是用来实现与外部系统连接、并将数据提交写入的Flink程序中所有对外的输出操作一般都是利用Sink算子完成的。 Flink1.12以前Sink算子的创建是通过调用DataStream的.addSink()方法实现的。 stream.addSink(new SinkFunction(…)); addSink方法同样需要传入一个参数实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke()用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。 Flink1.12开始同样重构了Sink架构 stream.sinkTo(…) 当然Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示列出了Flink官方目前支持的第三方系统连接器 https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/connectors/datastream/overview/ 我们可以看到像Kafka之类流式系统Flink提供了完美对接source/sink两端都能连接可读可写而对于Elasticsearch、JDBC等数据存储系统则只提供了输出写入的sink连接器。 除Flink官方之外Apache Bahir框架也实现了一些其他第三方系统与Flink的连接器。 除此以外就需要用户自定义实现sink连接器了。
输出到文件
Flink专门提供了一个流式文件系统的连接器FileSink为批处理和流处理提供了一个统一的Sink它可以将分区文件写入Flink支持的文件系统。 FileSink支持行编码Row-encoded和批量编码Bulk-encoded格式。这两种不同的方式都有各自的构建器builder可以直接调用FileSink的静态方法
行编码 FileSink.forRowFormatbasePathrowEncoder。批量编码 FileSink.forBulkFormatbasePathbulkWriterFactory。
public class SinkFile {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 每个目录中都有 并行度个数的 文件在写入env.setParallelism(2);// 必须开启checkpoint否则一直都是 .inprogressenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);DataGeneratorSourceString dataGeneratorSource new DataGeneratorSource(new GeneratorFunctionLong, String() {Overridepublic String map(Long value) throws Exception {return Number: value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(1000),Types.STRING);DataStreamSourceString dataGen env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), data-generator);// 输出到文件系统FileSinkString fieSink FileSink// 输出行式存储的文件指定路径、指定编码.StringforRowFormat(new Path(f:/tmp), new SimpleStringEncoder(UTF-8))// 输出文件的一些配置 文件名的前缀、后缀.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix(atguigu-).withPartSuffix(.log).build())// 按照目录分桶如下就是每个小时一个目录.withBucketAssigner(new DateTimeBucketAssigner(yyyy-MM-dd HH, ZoneId.systemDefault()))// 文件滚动策略: 1分钟 或 1m.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(1)).withMaxPartSize(new MemorySize(1024*1024)).build()).build();dataGen.sinkTo(fieSink);env.execute();}
}
输出到 Kafka
1添加Kafka 连接器依赖 由于我们已经测试过从Kafka数据源读取数据连接器相关依赖已经引入这里就不重复介绍了。 2启动Kafka集群 3编写输出到Kafka的示例代码
public class SinkKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 如果是精准一次必须开启checkpoint后续章节介绍env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperatorString sensorDS env.socketTextStream(hadoop102, 7777);/*** Kafka Sink:* TODO 注意如果要使用 精准一次 写入Kafka需要满足以下条件缺一不可* 1、开启checkpoint后续介绍* 2、设置事务前缀* 3、设置事务超时时间 checkpoint间隔 事务超时时间 max的15分钟*/KafkaSinkString kafkaSink KafkaSink.Stringbuilder()// 指定 kafka 的地址和端口.setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092)// 指定序列化器指定Topic名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.Stringbuilder().setTopic(ws).setValueSerializationSchema(new SimpleStringSchema()).build())// 写到kafka的一致性级别 精准一次、至少一次.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 如果是精准一次必须设置 事务的前缀.setTransactionalIdPrefix(atguigu-)// 如果是精准一次必须设置 事务超时时间: 大于checkpoint间隔小于 max 15分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000).build();sensorDS.sinkTo(kafkaSink);env.execute();}
}自定义序列化器实现带key的record:
public class SinkKafkaWithKey {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);env.setRestartStrategy(RestartStrategies.noRestart());SingleOutputStreamOperatorString sensorDS env.socketTextStream(hadoop102, 7777);/*** 如果要指定写入kafka的key可以自定义序列化器* 1、实现 一个接口重写 序列化 方法* 2、指定key转成 字节数组* 3、指定value转成 字节数组* 4、返回一个 ProducerRecord对象把key、value放进去*/KafkaSinkString kafkaSink KafkaSink.Stringbuilder().setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092).setRecordSerializer(new KafkaRecordSerializationSchemaString() {NullableOverridepublic ProducerRecordbyte[], byte[] serialize(String element, KafkaSinkContext context, Long timestamp) {String[] datas element.split(,);byte[] key datas[0].getBytes(StandardCharsets.UTF_8);byte[] value element.getBytes(StandardCharsets.UTF_8);return new ProducerRecord(ws, key, value);}}).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix(atguigu-).setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 ).build();sensorDS.sinkTo(kafkaSink);env.execute();}
}输出到 mysql
写入数据的MySQL的测试步骤如下。 1添加依赖 添加MySQL驱动
dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.27/version
/dependency官方还未提供flink-connector-jdbc的1.17.0的正式依赖暂时从apache snapshot仓库下载pom文件中指定仓库路径
repositoriesrepositoryidapache-snapshots/idnameapache snapshots/name
urlhttps://repository.apache.org/content/repositories/snapshots//url/repository
/repositories添加依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactIdversion1.17-SNAPSHOT/version
/dependency如果不生效还需要修改本地maven的配置文件mirrorOf中添加如下标红内容
mirroridaliyunmaven/idmirrorOf*,!apache-snapshots/mirrorOfname阿里云公共仓库/nameurlhttps://maven.aliyun.com/repository/public/url
/mirror2启动MySQL在test库下建表ws
mysql
CREATE TABLE ws (
id varchar(100) NOT NULL,
ts bigint(20) DEFAULT NULL,
vc int(11) DEFAULT NULL,
PRIMARY KEY (id)
) ENGINEInnoDB DEFAULT CHARSETutf83编写输出到MySQL的示例代码
public class SinkMySQL {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperatorWaterSensor sensorDS env
.socketTextStream(hadoop102, 7777)
.map(new WaterSensorMapFunction());/*** TODO 写入mysql* 1、只能用老的sink写法 addsink* 2、JDBCSink的4个参数:* 第一个参数 执行的sql一般就是 insert into* 第二个参数 预编译sql 对占位符填充值* 第三个参数 执行选项 ---》 攒批、重试* 第四个参数 连接选项 ---》 url、用户名、密码*/
SinkFunctionWaterSensor jdbcSink JdbcSink.sink(insert into ws values(?,?,?),new JdbcStatementBuilderWaterSensor() {Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {//每收到一条WaterSensor如何去填充占位符preparedStatement.setString(1, waterSensor.getId());preparedStatement.setLong(2, waterSensor.getTs());preparedStatement.setInt(3, waterSensor.getVc());}},JdbcExecutionOptions.builder().withMaxRetries(3) // 重试次数.withBatchSize(100) // 批次的大小条数.withBatchIntervalMs(3000) // 批次的时间.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(jdbc:mysql://hadoop102:3306/test?serverTimezoneAsia/ShanghaiuseUnicodetruecharacterEncodingUTF-8).withUsername(root).withPassword(000000).withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build()
);sensorDS.addSink(jdbcSink);env.execute();
}
}4运行代码用客户端连接MySQL查看是否成功写入数据。
自定义 sink
如果我们想将数据存储到我们自己的存储设备中而Flink并没有提供可以直接使用的连接器就只能自定义Sink进行输出了。与Source类似Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类只要实现它通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。 stream.addSink(new MySinkFunction()); 在实现SinkFunction的时候需要重写的一个关键方法invoke()在这个方法中我们就可以实现将流里的数据发送出去的逻辑。 这种方式比较通用对于任何外部存储系统都有效不过自定义Sink想要实现状态一致性并不容易所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现而且在不断地扩充因此自定义的场景并不常见。