当前位置: 首页 > news >正文

深圳网站设计建设公司洛阳做网站多少钱

深圳网站设计建设公司,洛阳做网站多少钱,建站程序大全,wordpress 图片展示插件Flink学习笔记 前言#xff1a;今天是学习 flink 的第 14 天啦#xff01;学习了 flink 高级特性和新特性之数据类型及 avro 序列化#xff0c;主要是解决大数据领域数据规范化写入和规范化读取的问题#xff0c;avro 数据结构可以节约存储空间#xff0c;本文中结合企业真…Flink学习笔记 前言今天是学习 flink 的第 14 天啦学习了 flink 高级特性和新特性之数据类型及 avro 序列化主要是解决大数据领域数据规范化写入和规范化读取的问题avro 数据结构可以节约存储空间本文中结合企业真实应用场景即 kafka 的读取和写入采用自定义序列化结合自己实验猜想和代码实践总结了很多自己的理解和想法希望和大家多多交流 Tips分享是快乐的源泉在我的博客里不仅有知识的海洋还有满满的正能量加持快来和我一起分享这份快乐吧 喜欢我的博客的话记得点个红心❤️和小关小注哦您的支持是我创作的动力 文章目录 Flink学习笔记四、Flink 高级特性和新特性4. 数据类型及序列化4.1 数据类型4.2 POJO 类型细节4.3 Avro优点介绍4.4 定义Avro Json格式4.5 使用 Java 自定义序列化到 Kfaka4.5.1 准备数据4.5.2 自定义Avro 序列化和反序列化4.5.3 创建生产者工具类4.5.4 创建消费者工具类4.5.5 运行程序 4.6 使用 Flink 自定义序列化到 Kafka4.6.1 准备数据4.6.2 自定义Avro 序列化和反序列化4.6.3 创建 Flink-source 类4.6.4 创建 Flink-sink 类4.6.5 运行程序 四、Flink 高级特性和新特性 4. 数据类型及序列化 4.1 数据类型 flink 支持的数据类型七种 4.2 POJO 类型细节 注意事项 该类需要有 public 修饰该类需要有 public 修饰的无参构造函数该类的所有no-static、no-transient字段必须是 public如果不是 public 则必须是有标准的 getter 和 setter该类的所有字段都必须是 flink 支持的数据类型 4.3 Avro优点介绍 Avro 是数据序列化系统支持大批量数据交换的应用。 支持二进制序列化方式性能好 / 效率高使用 JSON 描述。 动态语言友好RPC 远程调用支持同步和异步通信。 4.4 定义Avro Json格式 namespace要生成的目录type类型 avro 需要指定 recordname会自动生成的对象fields要指定的字段 注意: 创建的文件后缀名一定要叫 avsc而不是 avro 后缀使用 idea 生成 Order 对象 {namespace: cn.itcast.beans,type: record,name: OrderModel,fields: [{name: userId, type: string},{name: timestamp, type: long},{name: money, type: double},{name: category, type: string}] }注意由于在导入 pom 依赖的时候需要注意插件冲突注释掉以下依赖不然会一直爆错 !-- 这个会和 avro 冲突所以先注释一下-- !-- dependency-- !-- groupIdorg.apache.hive/groupId-- !-- artifactIdhive-exec/artifactId-- !-- version2.1.0/version-- !-- /dependency--快看一导入需要的依赖到 pom 文件中 dependencygroupIdorg.apache.avro/groupIdartifactIdavro/artifactIdversion1.8.2/version /dependency !-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -- dependencygroupIdorg.apache.flink/groupIdartifactIdflink-avro/artifactIdversion${flink.version}/version /dependency!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -- dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.5.1/version /dependency dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-streams/artifactIdversion2.5.1/version /dependency快看二导入需要的插件到 pom 文件中 !-- avro编译插件 -- plugingroupIdorg.apache.avro/groupIdartifactIdavro-maven-plugin/artifactIdversion1.8.2/versionexecutionsexecutionphasegenerate-sources/phasegoalsgoalschema/goal/goalsconfigurationsourceDirectory${project.basedir}/src/main/avro//sourceDirectoryoutputDirectory${project.basedir}/src/main/java//outputDirectory/configuration/execution/executions /plugin4.5 使用 Java 自定义序列化到 Kfaka 4.5.1 准备数据 order.csv user_001,1621718199,10.1,电脑 user_001,1621718201,14.1,手机 user_002,1621718202,82.5,手机 user_001,1621718205,15.6,电脑 user_004,1621718207,10.2,家电 user_001,1621718208,15.8,电脑 user_005,1621718212,56.1,电脑 user_002,1621718260,40.3,家电 user_001,1621718580,11.5,家居 user_001,1621718860,61.6,家居4.5.2 自定义Avro 序列化和反序列化 首先需要实现2个接口分别为 Serializer 和 Deserializer 分别是序列化和反序列化 package cn.itcast.day14.serialization_java;/*** author lql* time 2024-03-10 16:29:49* description TODO*/import cn.itcast.day14.beans.OrderModel; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter;import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Map; import org.apache.kafka.common.serialization.*;/*** 自定义序列化和反序列化*/ public class SimpleAvroSchemaJava implements SerializerOrderModel, DeserializerOrderModel {Overridepublic void configure(MapString, ? configs, boolean isKey) {}Overridepublic void close() {}Overridepublic byte[] serialize(String s, OrderModel order) {// 创建序列化执行器SpecificDatumWriterOrderModel writer new SpecificDatumWriterOrderModel(order.getSchema());// 创建一个流 用存储序列化后的二进制文件ByteArrayOutputStream out new ByteArrayOutputStream();// 创建二进制编码器BinaryEncoder encoder EncoderFactory.get().directBinaryEncoder(out, null);try {// 数据入都流中writer.write(order, encoder);} catch (IOException e) {e.printStackTrace();}return out.toByteArray();}Overridepublic OrderModel deserialize(String s, byte[] bytes) {// 用来保存结果数据OrderModel order new OrderModel();// 创建输入流用来读取二进制文件ByteArrayInputStream arrayInputStream new ByteArrayInputStream(bytes);// 创建输入序列化执行器SpecificDatumReaderOrderModel stockSpecificDatumReader new SpecificDatumReaderOrderModel(order.getSchema());//创建二进制解码器BinaryDecoder binaryDecoder DecoderFactory.get().directBinaryDecoder(arrayInputStream, null);try {// 数据读取order stockSpecificDatumReader.read(null, binaryDecoder);} catch (IOException e) {e.printStackTrace();}// 结果返回return order;} }4.5.3 创建生产者工具类 package cn.itcast.day14.serialization_java;import cn.itcast.day14.beans.OrderModel; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord;import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.util.ArrayList; import java.util.List; import java.util.Properties; /*** author lql* time 2024-03-10 16:33:31* description TODO*/ public class OrderProducerJava {public static void main(String[] args) {// 获取数据ListOrderModel data getData();System.out.println(data);try {// 创建配置文件Properties props new Properties();props.setProperty(bootstrap.servers, node1:9092);// 这里的健还是 string 序列化props.setProperty(key.serializer, org.apache.kafka.common.serialization.StringSerializer);// 这里的值需要指向自定义的序列化props.setProperty(value.serializer, cn.itcast.day14.serialization_java.SimpleAvroSchemaJava);// 创建kafka的生产者KafkaProducerString, OrderModel userBehaviorProducer new KafkaProducerString, OrderModel(props);// 循环遍历数据for (OrderModel orderModel : data) {ProducerRecordString, OrderModel producerRecord new ProducerRecordString, OrderModel(order, orderModel);userBehaviorProducer.send(producerRecord);System.out.println(数据写入成功data);Thread.sleep(1000);}} catch (InterruptedException e) {e.printStackTrace();}}public static ListOrderModel getData() {ArrayListOrderModel orderModels new ArrayListOrderModel();try {BufferedReader br new BufferedReader(new FileReader(new File(D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\order.csv)));String line ;while ((line br.readLine()) ! null) {String[] fields line.split(,);orderModels.add(new OrderModel(fields[0], Long.parseLong(fields[1]), Double.parseDouble(fields[2]), fields[3]));}} catch (Exception e) {e.printStackTrace();}return orderModels;} }4.5.4 创建消费者工具类 package cn.itcast.day14.serialization_java;import cn.itcast.day14.beans.OrderModel; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays; import java.util.Properties; /*** author lql* time 2024-03-10 16:38:29* description TODO*/ public class OrderConsumerJava {public static void main(String[] args) {Properties prop new Properties();prop.put(bootstrap.servers, node1:9092);prop.put(group.id, order);prop.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);// 设置反序列化类为自定义的avro反序列化类prop.put(value.deserializer, cn.itcast.day14.serialization_java.SimpleAvroSchemaJava);KafkaConsumerString, OrderModel consumer new KafkaConsumerString, OrderModel(prop);consumer.subscribe(Arrays.asList(order));while (true) {// poll 方法用于从 kafka 中拉取数据ConsumerRecordsString, OrderModel poll consumer.poll(1000);for (ConsumerRecordString, OrderModel stringStockConsumerRecord : poll) {System.out.println(stringStockConsumerRecord.value());}}} }4.5.5 运行程序 # 首先启动zookeeper# 启动 kafka记得后台启动 后台:cd /export/servers/kafka_2.11-0.10.0.0nohup bin/kafka-server-start.sh config/server.properties 21 停止:cd /export/servers/kafka_2.11-0.10.0.0bin/kafka-server-stop.sh# 创建topic bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 1 --topic order # 模拟消费者 bin/kafka-console-consumer.sh --zookeeper node1:2181 --from-beginning --topic order结果 生产者打印 [{userId: user_001, timestamp: 1621718199, money: 10.1, category: 电脑}, {userId: user_001, timestamp: 1621718201, money: 14.1, category: 手机}, {userId: user_002, timestamp: 1621718202, money: 82.5, category: 手机}, {userId: user_001, timestamp: 1621718205, money: 15.6, category: 电脑}, {userId: user_004, timestamp: 1621718207, money: 10.2, category: 家电}, {userId: user_001, timestamp: 1621718208, money: 15.8, category: 电脑}, {userId: user_005, timestamp: 1621718212, money: 56.1, category: 电脑}, {userId: user_002, timestamp: 1621718260, money: 40.3, category: 家电}, {userId: user_001, timestamp: 1621718580, money: 11.5, category: 家居}, {userId: user_001, timestamp: 1621718860, money: 61.6, category: 家居}] 数据写入成功消费者打印 {userId: user_001, timestamp: 1621718199, money: 10.1, category: 电脑} {userId: user_001, timestamp: 1621718201, money: 14.1, category: 手机} {userId: user_002, timestamp: 1621718202, money: 82.5, category: 手机} {userId: user_001, timestamp: 1621718205, money: 15.6, category: 电脑} {userId: user_004, timestamp: 1621718207, money: 10.2, category: 家电} {userId: user_001, timestamp: 1621718208, money: 15.8, category: 电脑} {userId: user_005, timestamp: 1621718212, money: 56.1, category: 电脑} {userId: user_002, timestamp: 1621718260, money: 40.3, category: 家电} {userId: user_001, timestamp: 1621718580, money: 11.5, category: 家居} {userId: user_001, timestamp: 1621718860, money: 61.6, category: 家居}总结值的序列化需要指定自己定义的序列化。 4.6 使用 Flink 自定义序列化到 Kafka 4.6.1 准备数据 order.csv user_001,1621718199,10.1,电脑 user_001,1621718201,14.1,手机 user_002,1621718202,82.5,手机 user_001,1621718205,15.6,电脑 user_004,1621718207,10.2,家电 user_001,1621718208,15.8,电脑 user_005,1621718212,56.1,电脑 user_002,1621718260,40.3,家电 user_001,1621718580,11.5,家居 user_001,1621718860,61.6,家居4.6.2 自定义Avro 序列化和反序列化 首先需要实现2个接口分别为 SerializationSchema 和 DeserializationSchema 分别是序列化和反序列化 package cn.itcast.day14.serialization_flink;/*** author lql* time 2024-03-10 17:35:09* description TODO*/ import cn.itcast.day14.beans.OrderModel; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation;import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException;/*** 自定义序列化和反序列化*/ public class SimpleAvroSchemaFlink implements DeserializationSchemaOrderModel, SerializationSchemaOrderModel {Overridepublic byte[] serialize(OrderModel order) {// 创建序列化执行器SpecificDatumWriterOrderModel writer new SpecificDatumWriterOrderModel(order.getSchema());// 创建一个流 用存储序列化后的二进制文件ByteArrayOutputStream out new ByteArrayOutputStream();// 创建二进制编码器BinaryEncoder encoder EncoderFactory.get().directBinaryEncoder(out, null);try {// 数据入都流中writer.write(order, encoder);} catch (IOException e) {e.printStackTrace();}return out.toByteArray();}Overridepublic TypeInformationOrderModel getProducedType() {return TypeInformation.of(OrderModel.class);}Overridepublic OrderModel deserialize(byte[] bytes) throws IOException {// 用来保存结果数据OrderModel userBehavior new OrderModel();// 创建输入流用来读取二进制文件ByteArrayInputStream arrayInputStream new ByteArrayInputStream(bytes);// 创建输入序列化执行器SpecificDatumReaderOrderModel stockSpecificDatumReader new SpecificDatumReaderOrderModel(userBehavior.getSchema());//创建二进制解码器BinaryDecoder binaryDecoder DecoderFactory.get().directBinaryDecoder(arrayInputStream, null);try {// 数据读取userBehaviorstockSpecificDatumReader.read(null, binaryDecoder);} catch (IOException e) {e.printStackTrace();}// 结果返回return userBehavior;}Overridepublic boolean isEndOfStream(OrderModel userBehavior) {return false;} } 4.6.3 创建 Flink-source 类 package cn.itcast.day14.serialization_flink;/*** author lql* time 2024-03-10 17:37:15* description TODO*/ import cn.itcast.day14.beans.OrderModel; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.util.Properties;public class OrderProducerFlink {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString value env.readTextFile(D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\order.csv);DataStreamOrderModel orderModelDataStream value.map(row - {String[] fields row.split(,);return new OrderModel(fields[0], Long.parseLong(fields[1]), Double.parseDouble(fields[2]), fields[3]);});Properties prop new Properties();prop.setProperty(bootstrap.servers, node1:9092);//4.连接KafkaFlinkKafkaProducerOrderModel producer new FlinkKafkaProducer(order,new SimpleAvroSchemaFlink(),prop);//5.将数据打入kafkaorderModelDataStream.addSink(producer);//6.执行任务env.execute();} }4.6.4 创建 Flink-sink 类 package cn.itcast.day14.serialization_flink;/*** author lql* time 2024-03-10 17:40:04* description TODO*/ import cn.itcast.day14.beans.OrderModel; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class OrderConsumerFlink {public static void main(String[] args) throws Exception {//1.构建流处理运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); // 设置并行度1 方便后面测试// 2.设置kafka 配置信息Properties prop new Properties();prop.put(bootstrap.servers, node1:9092);prop.put(group.id, UserBehavior);prop.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);// 下面有提及SimpleAvroSchemaFlink上面就不需要指定了// 3.构建Kafka 连接器FlinkKafkaConsumer kafka new FlinkKafkaConsumerOrderModel(order, new SimpleAvroSchemaFlink(), prop);//4.设置Flink层最新的数据开始消费kafka.setStartFromLatest();//5.基于kafka构建数据源DataStreamOrderModel data env.addSource(kafka);//6.结果打印data.print();env.execute();} }4.6.5 运行程序 这里运用 Kafka-Tool 2.0.7 可视化工具工具包放在资源处啦大家感兴趣可以观看我上传的资源哟 总结 刚开始学习这个知识点时我真是感觉有些吃力觉得它太抽象、太难以理解了。但想到这是企业工作环境中必须掌握的技术能够为企业节省资源、提高数据存储效率我就鼓起勇气决定迎难而上。 于是我根据例子一个字母一个字母地敲下代码反复调试、尝试。终于当程序运行成功的那一刻我感到了前所未有的成就感 原来成功真的就是坚持的结果。只有当你坚持不懈地努力才能有机会看到胜利的曙光。 明天我也要继续努力学习不断挑战自己迎接更多的成功 加油
http://www.zqtcl.cn/news/333709/

相关文章:

  • 做结婚请柬网站有那些做网店哪个网站好
  • 做网站尽在美橙互联欧美简约风格网站设计
  • idea建设完整的网站微官网下载
  • 阿城区建设小学网站上海建设行政主管部门政务网站
  • 西丽网站建设网站怎样做才能有点击率
  • 网站建设图片大小建设部网站1667号公告
  • 做wps的网站赚钱网站建设中网站图片如何修改
  • 公司招商型网站建设怎么自己做网站挣钱
  • 红酒手机网站建设中视频自媒体注册
  • 免费网站2022年能用的网址青阳网站建设
  • 网站建设的开发方式知乎科技部网站建设合同范本
  • 兰州市建设厅官方网站做酒店的网站
  • 宠物店网站开发文档撰写洛阳市河阳建设工程有限公司网站
  • 毕业设计做网站应该学什么wordpress调用子分类
  • 怎么建网站做淘宝客用国外网站 图片做自媒体
  • 汕头建站模板搭建怎么制作有效网站
  • 学生个人网站作品怎么wordpress用的什么主题
  • 设计logo网站侵权吗知乎一键做单页网站
  • 网站服务器有什么用做视频网站视频存放问题
  • 影评网站建设可以免费发布招聘网站
  • 富阳做兼职的网站正邦设计上海分公司
  • 网站漏洞解决办法投资
  • wordpress网站如何网页设计实训总结3000字大学篇
  • 用ps怎么做网站导航条wordpress 开启缩略图
  • 网上销售型的企业网站为什么要域名备案
  • 唐山网站建设方案优化国内酷炫网站
  • 国外网站备案吗网站做一样没有侵权吧
  • 谷歌怎么建网站ps中怎样做网站轮播图片
  • 汕头有没有做网站廊坊宣传片制作公司
  • 百度快速收录网站有些人做网站不用钱的 对吗