当前位置: 首页 > news >正文

怎么做网站收录网站建设 的公司哪家好

怎么做网站收录,网站建设 的公司哪家好,网站开发工具链接服务器,嘉兴信息网站一、概念 1.1 Apache Flink 两种关系型 API Apache Flink 有两种关系型 API 来做流批统一处理#xff1a;Table API 和 SQL。 Table API 是用于 Scala 和 Java 语言的查询API#xff0c;它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。 Flink SQL 是…一、概念 1.1 Apache Flink 两种关系型 API Apache Flink 有两种关系型 API 来做流批统一处理Table API 和 SQL。 Table API 是用于 Scala 和 Java 语言的查询API它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。 Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。这两种 API 中的查询对于批DataSet和流DataStream的输入有相同的语义也会产生同样的计算结果。 Table API 和 SQL 两种 API 是紧密集成的以及 DataStream 和 DataSet API。你可以在这些 API 之间以及一些基于这些 API 的库之间轻松的切换。比如你可以先用 CEP 从 DataStream 中做模式匹配然后用 Table API 来分析匹配的结果或者你可以用 SQL 来扫描、过滤、聚合一个批式的表然后再跑一个 Gelly 图算法 来处理已经预处理好的数据。 注意Table API 和 SQL 现在还处于活跃开发阶段还没有完全实现所有的特性。不是所有的 [Table APISQL] 和 [流批] 的组合都是支持的。 1.2 动态表(Dynamic Tables) 动态表是 Flink 的支持流数据的 Table API 和 SQL 的核心概念。 动态表是随时间变化的可以像查询静态批处理表一样查询它们。查询动态表将生成一个连续查询(Continuous Query)。一个连续查询永远不会终止结果会生成一个动态表。查询不断更新其(动态)结果表以反映其(动态)输入表上的更改。 需要注意的是连续查询的结果在语义上总是等价于以批处理模式在输入表快照上执行的相同查询的结果。 对动态表的一般处理过程 流-动态表-连续查询处理-动态表-流 二、导入Flink Table API依赖 pom.xml 中添加 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-blink_${scala.binary.version}/artifactIdversion${flink.version}/versionscopeprovided/scope /dependency dependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-scala_${scala.binary.version}/artifactIdversion${flink.version}/versionscopeprovided/scope /dependency dependencygroupIdorg.apache.flink/groupIdartifactIdflink-csv/artifactIdversion${flink.version}/version /dependency dependencygroupIdorg.apache.flink/groupIdartifactIdflink-json/artifactIdversion${flink.version}/version /dependency !-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -- dependencygroupIdorg.apache.commons/groupIdartifactIdcommons-compress/artifactIdversion1.21/version /dependency三、表与DataStream的混合使用简单案例 package com.lyh.flink12;import com.lyh.bean.WaterSensor; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row;//必须添加此类才能在表达式中运用$符号 import static org.apache.flink.table.api.Expressions.$;public class Table_Api_BasicUse {public static void main(String[] args) throws Exception {// 流运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//并行参数env.setParallelism(1);// 数据源DataStreamSourceWaterSensor waterSensorStream env.fromElements(new WaterSensor(sensor_1, 1000L, 10),new WaterSensor(sensor_1, 2000L, 20),new WaterSensor(sensor_2, 3000L, 30),new WaterSensor(sensor_1, 4000L, 40),new WaterSensor(sensor_1, 5000L, 50),new WaterSensor(sensor_2, 6000L, 60));// 创建表的执行环境StreamTableEnvironment TableEnv StreamTableEnvironment.create(env);// 创建表将流转换成动态表. 表的字段名从pojo的属性名自动抽取Table table TableEnv.fromDataStream(waterSensorStream);// 对动态表进行查询Table resultTable table.where($(id).isEqual(sensor_1)).select($(id),$(vc));//把动态表转化为流DataStreamRow dataStream TableEnv.toAppendStream(resultTable,Row.class);dataStream.print();env.execute();} }四、表到流的转换 动态表可以像普通数据库表一样通过 INSERT、UPDATE 和 DELETE 来不断修改。它可能是一个只有一行、不断更新的表也可能是一个 insert-only 的表没有 UPDATE 和 DELETE 修改或者介于两者之间的其他表。 在将动态表转换为流或将其写入外部系统时需要对这些更改进行编码。Flink的 Table API 和 SQL 支持三种方式来编码一个动态表的变化: Append-only 流 仅通过 INSERT 操作修改的动态表可以通过输出插入的行转换为流。 Retract 流 retract 流包含两种类型的 message add messages 和 retract messages 。通过将INSERT 操作编码为 add message、将 DELETE 操作编码为 retract message、将 UPDATE 操作编码为更新(先前)行的 retract message 和更新(新)行的 add message将动态表转换为 retract 流。下图显示了将动态表转换为 retract 流的过程。 Upsert 流 upsert 流包含两种类型的 message upsert messages 和delete messages。转换为 upsert 流的动态表需要(可能是组合的)唯一键。通过将 INSERT 和 UPDATE 操作编码为 upsert message将 DELETE 操作编码为 delete message 将具有唯一键的动态表转换为流。消费流的算子需要知道唯一键的属性以便正确地应用 message。与 retract 流的主要区别在于 UPDATE 操作是用单个 message 编码的因此效率更高。下图显示了将动态表转换为 upsert 流的过程。 请注意在将动态表转换为 DataStream 时只支持 append 流和 retract 流。 五、通过Connector声明读入数据 前面是先得到流, 再转成动态表, 其实动态表也可以直接连接到数据 5.1 File source // 创建表 //表的元数据信息 Schema schema new Schema().field(id, DataTypes.STRING()).field(ts, DataTypes.BIGINT()).field(vc, DataTypes.INT()); // 连接文件, 并创建一个临时表, 其实就是一个动态表 tableEnv.connect(new FileSystem().path(input/sensor.txt)).withFormat(new Csv().fieldDelimiter(,).lineDelimiter(\n)).withSchema(schema).createTemporaryTable(sensor); // 做成表对象, 然后对动态表进行查询 Table sensorTable tableEnv.from(sensor); Table resultTable sensorTable.groupBy($(id)).select($(id), $(id).count().as(cnt)); // 把动态表转换成流. 如果涉及到数据的更新, 要用到撤回流. 多个了一个boolean标记 DataStreamTuple2Boolean, Row resultStream tableEnv.toRetractStream(resultTable, Row.class); resultStream.print();5.2 Kafka Source // 创建表 // 表的元数据信息 Schema schema new Schema().field(id, DataTypes.STRING()).field(ts, DataTypes.BIGINT()).field(vc, DataTypes.INT()); // 连接文件, 并创建一个临时表, 其实就是一个动态表 tableEnv.connect(new Kafka().version(universal).topic(sensor).startFromLatest().property(group.id, bigdata).property(bootstrap.servers, hadoop162:9092,hadoop163:9092,hadoop164:9092)).withFormat(new Json()).withSchema(schema).createTemporaryTable(sensor); //对动态表进行查询 Table sensorTable tableEnv.from(sensor); Table resultTable sensorTable.groupBy($(id)).select($(id), $(id).count().as(cnt)); //把动态表转换成流. 如果涉及到数据的更新, 要用到撤回流. 多个了一个boolean标记 DataStreamTuple2Boolean, Row resultStream tableEnv.toRetractStream(resultTable, Row.class); resultStream.print();六、通过Connector声明写出数据 6.1 File Sink package com.atguigu.flink.java.chapter_11;import com.atguigu.flink.java.chapter_5.WaterSensor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Csv; import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.Schema;import static org.apache.flink.table.api.Expressions.$;/*** Author lizhenchaoatguigu.cn* Date 2021/1/11 21:43*/ public class Flink02_TableApi_ToFileSystem {public static void main(String[] args) {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceWaterSensor waterSensorStream env.fromElements(new WaterSensor(sensor_1, 1000L, 10),new WaterSensor(sensor_1, 2000L, 20),new WaterSensor(sensor_2, 3000L, 30),new WaterSensor(sensor_1, 4000L, 40),new WaterSensor(sensor_1, 5000L, 50),new WaterSensor(sensor_2, 6000L, 60));// 1. 创建表的执行环境StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);Table sensorTable tableEnv.fromDataStream(waterSensorStream);Table resultTable sensorTable.where($(id).isEqual(sensor_1) ).select($(id), $(ts), $(vc));// 创建输出表Schema schema new Schema().field(id, DataTypes.STRING()).field(ts, DataTypes.BIGINT()).field(vc, DataTypes.INT());tableEnv.connect(new FileSystem().path(output/sensor_id.txt)).withFormat(new Csv().fieldDelimiter(|)).withSchema(schema).createTemporaryTable(sensor);// 把数据写入到输出表中resultTable.executeInsert(sensor);} }6.2 Kafka Sink package com.atguigu.flink.java.chapter_11;import com.atguigu.flink.java.chapter_5.WaterSensor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Json; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Schema;import static org.apache.flink.table.api.Expressions.$;/*** Author lizhenchaoatguigu.cn* Date 2021/1/11 21:43*/ public class Flink03_TableApi_ToKafka {public static void main(String[] args) {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceWaterSensor waterSensorStream env.fromElements(new WaterSensor(sensor_1, 1000L, 10),new WaterSensor(sensor_1, 2000L, 20),new WaterSensor(sensor_2, 3000L, 30),new WaterSensor(sensor_1, 4000L, 40),new WaterSensor(sensor_1, 5000L, 50),new WaterSensor(sensor_2, 6000L, 60));// 1. 创建表的执行环境StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);Table sensorTable tableEnv.fromDataStream(waterSensorStream);Table resultTable sensorTable.where($(id).isEqual(sensor_1) ).select($(id), $(ts), $(vc));// 创建输出表Schema schema new Schema().field(id, DataTypes.STRING()).field(ts, DataTypes.BIGINT()).field(vc, DataTypes.INT());tableEnv.connect(new Kafka().version(universal).topic(sink_sensor).sinkPartitionerRoundRobin().property(bootstrap.servers, hadoop162:9092,hadoop163:9092,hadoop164:9092)).withFormat(new Json()).withSchema(schema).createTemporaryTable(sensor);// 把数据写入到输出表中resultTable.executeInsert(sensor);} }七、基本使用 7.1 查询未注册的表 package com.lyh.flink12;import org.apache.flink.types.Row; import com.lyh.bean.WaterSensor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Connect_File_source {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceWaterSensor dataStreamSource env.fromElements(new WaterSensor(sensor_1, 1000L, 20),new WaterSensor(sensor_1, 2000L, 30),new WaterSensor(sensor_1, 3000L, 40),new WaterSensor(sensor_1, 4000L, 50),new WaterSensor(sensor_1, 5000L, 60));// 创建动态表环境StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);// 使用SQL查询未注册的表// 从流中得到一个表Table inputTable tableEnv.fromDataStream(dataStreamSource);Table resultTable tableEnv.sqlQuery(select * from inputTable where id sensor_1);tableEnv.toAppendStream(resultTable, Row.class).print();env.execute();} }7.2 查询已注册的表 package com.lyh.flink12; import com.atguigu.flink.java.chapter_5.WaterSensor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row;public class Flink05_SQL_BaseUse_2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceWaterSensor waterSensorStream env.fromElements(new WaterSensor(sensor_1, 1000L, 10),new WaterSensor(sensor_1, 2000L, 20),new WaterSensor(sensor_2, 3000L, 30),new WaterSensor(sensor_1, 4000L, 40),new WaterSensor(sensor_1, 5000L, 50),new WaterSensor(sensor_2, 6000L, 60));StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);// 使用sql查询一个已注册的表// 1. 从流得到一个表Table inputTable tableEnv.fromDataStream(waterSensorStream);// 2. 把注册为一个临时视图tableEnv.createTemporaryView(sensor, inputTable);// 3. 在临时视图查询数据, 并得到一个新表Table resultTable tableEnv.sqlQuery(select * from sensor where idsensor_1);// 4. 显示resultTable的数据tableEnv.toAppendStream(resultTable, Row.class).print();env.execute();} }7.3 Kafka到Kafka 使用sql从Kafka读数据, 并写入到Kafka中 package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Sql_kafka_kafka {public static void main(String[] args) {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);tableEnv.executeSql(create table source_sensor (id string, ts bigint, vc int) with( connector kafka, topic topic_source_sensor, properties.bootstrap.servers hadoop100:9029, properties.group.id atguigu, scan.startup.mode latest-offset, format json ));// 2. 注册SinkTable: sink_sensortableEnv.executeSql(create table sink_sensor(id string, ts bigint, vc int) with( connector kafka, topic topic_sink_sensor, properties.bootstrap.servers hadoop100:9029, format json ));// 3. 从SourceTable 查询数据, 并写入到 SinkTabletableEnv.executeSql(insert into sink_sensor select * from source_sensor where idsensor_1);} }
http://www.zqtcl.cn/news/6393/

相关文章:

  • 网络推广就找南昌莫非传媒温州seo排名
  • 国外设计公司网站欣赏沈阳男科医院好排行
  • 杭州网站推广技巧南通优普网站建设制作
  • 下载软件app排行榜北京网站建设 seo公司哪家好
  • 网站建设的行业新闻男和男做那个视频网站好
  • 怎么做蛋糕店的网站seo怎么发文章 seo发布工具
  • 怎么看网站有没有做百度推广安庆专业做淘宝网站
  • 网站开发与维护前景封面制作网站
  • 全国可信网站考研培训机构排名前五的机构
  • 国外交互设计网站欣赏搭建一个网站要多少
  • 本地的上海网站建设公网页制作免费教程
  • 登封网站制作自动采集更新网站源码
  • 太原手机模板建站公司变更登记申请书下载
  • 做地方门户网站赚钱吗广东省城乡建设部网站
  • sql2008做查询网站做网站的公司广州
  • 网站入口类型洞口网站开发公司推荐
  • 网站建设新发展wordpress上传预告片
  • yahoo不收录我的网站门户类网站前台
  • 鲜花网站有关建设生态建设网站
  • 一个网站怎么做流量统计湘潭响应式网站建设 速来磐石网络
  • 宜昌住房和城乡建设厅网站容桂微信网站建设
  • 外贸公司网站建设方案外贸网站wordpress
  • 盐城建设银行网站做美食网站的项目背景
  • 成都网站开发排名designer怎么做网站
  • 电子商务网站建设策划书范文苏州做网站公司
  • 怎吗做网站挣钱谁教我做啊谁会做网站啊
  • 黑龙江网站建设开发棋牌推广
  • 网站如何做免费的推广软件怎么做出来的
  • 江苏水利工程建设招投标网站wordpress 阿里oss
  • 在线建设房屋设计网站泉州做网站开发公司