深圳网站设计建设公司,洛阳做网站多少钱,建站程序大全,wordpress 图片展示插件Flink学习笔记 前言#xff1a;今天是学习 flink 的第 14 天啦#xff01;学习了 flink 高级特性和新特性之数据类型及 avro 序列化#xff0c;主要是解决大数据领域数据规范化写入和规范化读取的问题#xff0c;avro 数据结构可以节约存储空间#xff0c;本文中结合企业真…Flink学习笔记 前言今天是学习 flink 的第 14 天啦学习了 flink 高级特性和新特性之数据类型及 avro 序列化主要是解决大数据领域数据规范化写入和规范化读取的问题avro 数据结构可以节约存储空间本文中结合企业真实应用场景即 kafka 的读取和写入采用自定义序列化结合自己实验猜想和代码实践总结了很多自己的理解和想法希望和大家多多交流 Tips分享是快乐的源泉在我的博客里不仅有知识的海洋还有满满的正能量加持快来和我一起分享这份快乐吧 喜欢我的博客的话记得点个红心❤️和小关小注哦您的支持是我创作的动力 文章目录 Flink学习笔记四、Flink 高级特性和新特性4. 数据类型及序列化4.1 数据类型4.2 POJO 类型细节4.3 Avro优点介绍4.4 定义Avro Json格式4.5 使用 Java 自定义序列化到 Kfaka4.5.1 准备数据4.5.2 自定义Avro 序列化和反序列化4.5.3 创建生产者工具类4.5.4 创建消费者工具类4.5.5 运行程序 4.6 使用 Flink 自定义序列化到 Kafka4.6.1 准备数据4.6.2 自定义Avro 序列化和反序列化4.6.3 创建 Flink-source 类4.6.4 创建 Flink-sink 类4.6.5 运行程序 四、Flink 高级特性和新特性
4. 数据类型及序列化
4.1 数据类型
flink 支持的数据类型七种 4.2 POJO 类型细节
注意事项
该类需要有 public 修饰该类需要有 public 修饰的无参构造函数该类的所有no-static、no-transient字段必须是 public如果不是 public 则必须是有标准的 getter 和 setter该类的所有字段都必须是 flink 支持的数据类型 4.3 Avro优点介绍 Avro 是数据序列化系统支持大批量数据交换的应用。 支持二进制序列化方式性能好 / 效率高使用 JSON 描述。 动态语言友好RPC 远程调用支持同步和异步通信。 4.4 定义Avro Json格式
namespace要生成的目录type类型 avro 需要指定 recordname会自动生成的对象fields要指定的字段
注意: 创建的文件后缀名一定要叫 avsc而不是 avro 后缀使用 idea 生成 Order 对象
{namespace: cn.itcast.beans,type: record,name: OrderModel,fields: [{name: userId, type: string},{name: timestamp, type: long},{name: money, type: double},{name: category, type: string}]
}注意由于在导入 pom 依赖的时候需要注意插件冲突注释掉以下依赖不然会一直爆错
!-- 这个会和 avro 冲突所以先注释一下--
!-- dependency--
!-- groupIdorg.apache.hive/groupId--
!-- artifactIdhive-exec/artifactId--
!-- version2.1.0/version--
!-- /dependency--快看一导入需要的依赖到 pom 文件中
dependencygroupIdorg.apache.avro/groupIdartifactIdavro/artifactIdversion1.8.2/version
/dependency
!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro --
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-avro/artifactIdversion${flink.version}/version
/dependency!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --
dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.5.1/version
/dependency
dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-streams/artifactIdversion2.5.1/version
/dependency快看二导入需要的插件到 pom 文件中
!-- avro编译插件 --
plugingroupIdorg.apache.avro/groupIdartifactIdavro-maven-plugin/artifactIdversion1.8.2/versionexecutionsexecutionphasegenerate-sources/phasegoalsgoalschema/goal/goalsconfigurationsourceDirectory${project.basedir}/src/main/avro//sourceDirectoryoutputDirectory${project.basedir}/src/main/java//outputDirectory/configuration/execution/executions
/plugin4.5 使用 Java 自定义序列化到 Kfaka
4.5.1 准备数据
order.csv
user_001,1621718199,10.1,电脑
user_001,1621718201,14.1,手机
user_002,1621718202,82.5,手机
user_001,1621718205,15.6,电脑
user_004,1621718207,10.2,家电
user_001,1621718208,15.8,电脑
user_005,1621718212,56.1,电脑
user_002,1621718260,40.3,家电
user_001,1621718580,11.5,家居
user_001,1621718860,61.6,家居4.5.2 自定义Avro 序列化和反序列化
首先需要实现2个接口分别为 Serializer 和 Deserializer 分别是序列化和反序列化
package cn.itcast.day14.serialization_java;/*** author lql* time 2024-03-10 16:29:49* description TODO*/import cn.itcast.day14.beans.OrderModel;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import org.apache.kafka.common.serialization.*;/*** 自定义序列化和反序列化*/
public class SimpleAvroSchemaJava implements SerializerOrderModel, DeserializerOrderModel {Overridepublic void configure(MapString, ? configs, boolean isKey) {}Overridepublic void close() {}Overridepublic byte[] serialize(String s, OrderModel order) {// 创建序列化执行器SpecificDatumWriterOrderModel writer new SpecificDatumWriterOrderModel(order.getSchema());// 创建一个流 用存储序列化后的二进制文件ByteArrayOutputStream out new ByteArrayOutputStream();// 创建二进制编码器BinaryEncoder encoder EncoderFactory.get().directBinaryEncoder(out, null);try {// 数据入都流中writer.write(order, encoder);} catch (IOException e) {e.printStackTrace();}return out.toByteArray();}Overridepublic OrderModel deserialize(String s, byte[] bytes) {// 用来保存结果数据OrderModel order new OrderModel();// 创建输入流用来读取二进制文件ByteArrayInputStream arrayInputStream new ByteArrayInputStream(bytes);// 创建输入序列化执行器SpecificDatumReaderOrderModel stockSpecificDatumReader new SpecificDatumReaderOrderModel(order.getSchema());//创建二进制解码器BinaryDecoder binaryDecoder DecoderFactory.get().directBinaryDecoder(arrayInputStream, null);try {// 数据读取order stockSpecificDatumReader.read(null, binaryDecoder);} catch (IOException e) {e.printStackTrace();}// 结果返回return order;}
}4.5.3 创建生产者工具类
package cn.itcast.day14.serialization_java;import cn.itcast.day14.beans.OrderModel;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/*** author lql* time 2024-03-10 16:33:31* description TODO*/
public class OrderProducerJava {public static void main(String[] args) {// 获取数据ListOrderModel data getData();System.out.println(data);try {// 创建配置文件Properties props new Properties();props.setProperty(bootstrap.servers, node1:9092);// 这里的健还是 string 序列化props.setProperty(key.serializer, org.apache.kafka.common.serialization.StringSerializer);// 这里的值需要指向自定义的序列化props.setProperty(value.serializer, cn.itcast.day14.serialization_java.SimpleAvroSchemaJava);// 创建kafka的生产者KafkaProducerString, OrderModel userBehaviorProducer new KafkaProducerString, OrderModel(props);// 循环遍历数据for (OrderModel orderModel : data) {ProducerRecordString, OrderModel producerRecord new ProducerRecordString, OrderModel(order, orderModel);userBehaviorProducer.send(producerRecord);System.out.println(数据写入成功data);Thread.sleep(1000);}} catch (InterruptedException e) {e.printStackTrace();}}public static ListOrderModel getData() {ArrayListOrderModel orderModels new ArrayListOrderModel();try {BufferedReader br new BufferedReader(new FileReader(new File(D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\order.csv)));String line ;while ((line br.readLine()) ! null) {String[] fields line.split(,);orderModels.add(new OrderModel(fields[0], Long.parseLong(fields[1]), Double.parseDouble(fields[2]), fields[3]));}} catch (Exception e) {e.printStackTrace();}return orderModels;}
}4.5.4 创建消费者工具类
package cn.itcast.day14.serialization_java;import cn.itcast.day14.beans.OrderModel;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;
import java.util.Properties;
/*** author lql* time 2024-03-10 16:38:29* description TODO*/
public class OrderConsumerJava {public static void main(String[] args) {Properties prop new Properties();prop.put(bootstrap.servers, node1:9092);prop.put(group.id, order);prop.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);// 设置反序列化类为自定义的avro反序列化类prop.put(value.deserializer, cn.itcast.day14.serialization_java.SimpleAvroSchemaJava);KafkaConsumerString, OrderModel consumer new KafkaConsumerString, OrderModel(prop);consumer.subscribe(Arrays.asList(order));while (true) {// poll 方法用于从 kafka 中拉取数据ConsumerRecordsString, OrderModel poll consumer.poll(1000);for (ConsumerRecordString, OrderModel stringStockConsumerRecord : poll) {System.out.println(stringStockConsumerRecord.value());}}}
}4.5.5 运行程序
# 首先启动zookeeper# 启动 kafka记得后台启动
后台:cd /export/servers/kafka_2.11-0.10.0.0nohup bin/kafka-server-start.sh config/server.properties 21 停止:cd /export/servers/kafka_2.11-0.10.0.0bin/kafka-server-stop.sh# 创建topic
bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 1 --topic order
# 模拟消费者
bin/kafka-console-consumer.sh --zookeeper node1:2181 --from-beginning --topic order结果
生产者打印
[{userId: user_001, timestamp: 1621718199, money: 10.1, category: 电脑}, {userId: user_001, timestamp: 1621718201, money: 14.1, category: 手机}, {userId: user_002, timestamp: 1621718202, money: 82.5, category: 手机}, {userId: user_001, timestamp: 1621718205, money: 15.6, category: 电脑}, {userId: user_004, timestamp: 1621718207, money: 10.2, category: 家电}, {userId: user_001, timestamp: 1621718208, money: 15.8, category: 电脑}, {userId: user_005, timestamp: 1621718212, money: 56.1, category: 电脑}, {userId: user_002, timestamp: 1621718260, money: 40.3, category: 家电}, {userId: user_001, timestamp: 1621718580, money: 11.5, category: 家居}, {userId: user_001, timestamp: 1621718860, money: 61.6, category: 家居}]
数据写入成功消费者打印
{userId: user_001, timestamp: 1621718199, money: 10.1, category: 电脑}
{userId: user_001, timestamp: 1621718201, money: 14.1, category: 手机}
{userId: user_002, timestamp: 1621718202, money: 82.5, category: 手机}
{userId: user_001, timestamp: 1621718205, money: 15.6, category: 电脑}
{userId: user_004, timestamp: 1621718207, money: 10.2, category: 家电}
{userId: user_001, timestamp: 1621718208, money: 15.8, category: 电脑}
{userId: user_005, timestamp: 1621718212, money: 56.1, category: 电脑}
{userId: user_002, timestamp: 1621718260, money: 40.3, category: 家电}
{userId: user_001, timestamp: 1621718580, money: 11.5, category: 家居}
{userId: user_001, timestamp: 1621718860, money: 61.6, category: 家居}总结值的序列化需要指定自己定义的序列化。 4.6 使用 Flink 自定义序列化到 Kafka
4.6.1 准备数据
order.csv
user_001,1621718199,10.1,电脑
user_001,1621718201,14.1,手机
user_002,1621718202,82.5,手机
user_001,1621718205,15.6,电脑
user_004,1621718207,10.2,家电
user_001,1621718208,15.8,电脑
user_005,1621718212,56.1,电脑
user_002,1621718260,40.3,家电
user_001,1621718580,11.5,家居
user_001,1621718860,61.6,家居4.6.2 自定义Avro 序列化和反序列化
首先需要实现2个接口分别为 SerializationSchema 和 DeserializationSchema 分别是序列化和反序列化
package cn.itcast.day14.serialization_flink;/*** author lql* time 2024-03-10 17:35:09* description TODO*/
import cn.itcast.day14.beans.OrderModel;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;/*** 自定义序列化和反序列化*/
public class SimpleAvroSchemaFlink implements DeserializationSchemaOrderModel, SerializationSchemaOrderModel {Overridepublic byte[] serialize(OrderModel order) {// 创建序列化执行器SpecificDatumWriterOrderModel writer new SpecificDatumWriterOrderModel(order.getSchema());// 创建一个流 用存储序列化后的二进制文件ByteArrayOutputStream out new ByteArrayOutputStream();// 创建二进制编码器BinaryEncoder encoder EncoderFactory.get().directBinaryEncoder(out, null);try {// 数据入都流中writer.write(order, encoder);} catch (IOException e) {e.printStackTrace();}return out.toByteArray();}Overridepublic TypeInformationOrderModel getProducedType() {return TypeInformation.of(OrderModel.class);}Overridepublic OrderModel deserialize(byte[] bytes) throws IOException {// 用来保存结果数据OrderModel userBehavior new OrderModel();// 创建输入流用来读取二进制文件ByteArrayInputStream arrayInputStream new ByteArrayInputStream(bytes);// 创建输入序列化执行器SpecificDatumReaderOrderModel stockSpecificDatumReader new SpecificDatumReaderOrderModel(userBehavior.getSchema());//创建二进制解码器BinaryDecoder binaryDecoder DecoderFactory.get().directBinaryDecoder(arrayInputStream, null);try {// 数据读取userBehaviorstockSpecificDatumReader.read(null, binaryDecoder);} catch (IOException e) {e.printStackTrace();}// 结果返回return userBehavior;}Overridepublic boolean isEndOfStream(OrderModel userBehavior) {return false;}
}
4.6.3 创建 Flink-source 类
package cn.itcast.day14.serialization_flink;/*** author lql* time 2024-03-10 17:37:15* description TODO*/
import cn.itcast.day14.beans.OrderModel;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.util.Properties;public class OrderProducerFlink {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString value env.readTextFile(D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\order.csv);DataStreamOrderModel orderModelDataStream value.map(row - {String[] fields row.split(,);return new OrderModel(fields[0], Long.parseLong(fields[1]), Double.parseDouble(fields[2]), fields[3]);});Properties prop new Properties();prop.setProperty(bootstrap.servers, node1:9092);//4.连接KafkaFlinkKafkaProducerOrderModel producer new FlinkKafkaProducer(order,new SimpleAvroSchemaFlink(),prop);//5.将数据打入kafkaorderModelDataStream.addSink(producer);//6.执行任务env.execute();}
}4.6.4 创建 Flink-sink 类
package cn.itcast.day14.serialization_flink;/*** author lql* time 2024-03-10 17:40:04* description TODO*/
import cn.itcast.day14.beans.OrderModel;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class OrderConsumerFlink {public static void main(String[] args) throws Exception {//1.构建流处理运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); // 设置并行度1 方便后面测试// 2.设置kafka 配置信息Properties prop new Properties();prop.put(bootstrap.servers, node1:9092);prop.put(group.id, UserBehavior);prop.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);// 下面有提及SimpleAvroSchemaFlink上面就不需要指定了// 3.构建Kafka 连接器FlinkKafkaConsumer kafka new FlinkKafkaConsumerOrderModel(order, new SimpleAvroSchemaFlink(), prop);//4.设置Flink层最新的数据开始消费kafka.setStartFromLatest();//5.基于kafka构建数据源DataStreamOrderModel data env.addSource(kafka);//6.结果打印data.print();env.execute();}
}4.6.5 运行程序
这里运用 Kafka-Tool 2.0.7 可视化工具工具包放在资源处啦大家感兴趣可以观看我上传的资源哟 总结 刚开始学习这个知识点时我真是感觉有些吃力觉得它太抽象、太难以理解了。但想到这是企业工作环境中必须掌握的技术能够为企业节省资源、提高数据存储效率我就鼓起勇气决定迎难而上。 于是我根据例子一个字母一个字母地敲下代码反复调试、尝试。终于当程序运行成功的那一刻我感到了前所未有的成就感 原来成功真的就是坚持的结果。只有当你坚持不懈地努力才能有机会看到胜利的曙光。 明天我也要继续努力学习不断挑战自己迎接更多的成功 加油