当前位置: 首页 > news >正文

网站添加支付宝国际油价最新消息

网站添加支付宝,国际油价最新消息,商标设计软件免费版,网页制作个人介绍代码文章目录1.使用flink的原因#xff1a;2. Flink支持两种模式#xff1a;3. flink table api工作原理#xff1a;4. Flink table api 使用5. select语句flink table api#xff1a;6. 使用flink table api 创建table7. 使用flink table api 写流式数据输出到表或sink8.… 文章目录1.使用flink的原因2. Flink支持两种模式3. flink table api工作原理4. Flink table api 使用5. select语句flink table api6. 使用flink table api 创建table7. 使用flink table api 写流式数据输出到表或sink8. flink auto-scaling(自动伸缩)9. watermarks水印10. 聚合数据Aggregating Data11. 使用窗口聚合数据(using windows to aggregate data)12.flink 多表连接joining flink tables13.flink table api 代码1.使用flink的原因 real-timescalablereliableFully-managed 2. Flink支持两种模式 Batch processing package com.flink.wc;import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector;/*** DataSet api实现不推荐官方推荐直接使用DataStream api* 批处理字符分割*/ public class WordCountBatchDemo {public static void main(String[] args) throws Exception {//1.创建执行环境ExecutionEnvironment envExecutionEnvironment.getExecutionEnvironment();//2.读取数据从文件中读取DataSourceString lineDS env.readTextFile(D:\\ideawork\\javaBasics\\input\\word.txt);//3.切分、转换word,1FlatMapOperatorString, Tuple2String, Integer wordAndOne lineDS.flatMap(new FlatMapFunctionString, Tuple2String, Integer() {Overridepublic void flatMap(String value, CollectorTuple2String, Integer out) throws Exception {//3.1 按照空格进行切分单词String[] words value.split( );for (String word : words) {//3.2 将单词转换二元组(word,1)Tuple2String, Integer wordTuple2 Tuple2.of(word, 1);//3.3 使用collector向下游发送数据out.collect(wordTuple2);}}});//4.按照word分组按照tuple中第一个位置word的索引0,分组UnsortedGroupingTuple2String, Integer wordAndOneGroupby wordAndOne.groupBy(0);//传入索引为0//5.各分组内聚合,按照二元组中第二个元素的位置进行聚合AggregateOperatorTuple2String, Integer sum wordAndOneGroupby.sum(1);//6.输出sum.print();} } Stateful stream Processing package com.flink.wc;import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class WordCountStreamDemo {public static void main(String[] args) throws Exception {//1.创建执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//2.读取数据DataStreamSourceString lineDSenv.readTextFile(input/word.txt);//3.处理数据SingleOutputStreamOperatorTuple2String, Integer wordAndOneDS lineDS.flatMap(new FlatMapFunctionString, Tuple2String, Integer() {Overridepublic void flatMap(String value, CollectorTuple2String, Integer out) throws Exception {//按照空格进行切分String[] words value.split( );for (String word : words) {//数组转成二元组word,1Tuple2String, Integer wordAndOne Tuple2.of(word, 1);//通过采集器向下游发送数据out.collect(wordAndOne);}}});//3.2 分组KeyedStreamTuple2String, Integer, String wordAndOneKS wordAndOneDS.keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer value) throws Exception {return value.f0;}});//3.3 聚合,从tuple第二个元素位置进行聚合SingleOutputStreamOperatorTuple2String, Integer sumDS wordAndOneKS.sum(1);//4.输出数据sumDS.print();//5.执行触发启动逻辑env.execute();} } 3. flink table api工作原理 优化器、计划器决定table api如何在flink集群上执行 执行flink run命令运行剩下的代码执行由flink集群执行 通过Confluent 插件,把flink java代码转换为SQL然后交由云进行执行 4. Flink table api 使用 查询数据库中表数据可以通过读取索引index查询但是查询数据流只能从开始读取整个流 Flink table api 工作对象自定向下包含Catalog、database、table 一个Catalog底下有1个或多个database一个database底下有1个或多个table 调用方式 例子 一个环境对应一个catalog,一个集群对应一个databse,一个Topic对应一个table但是这块topic与table的关系有点不一定 语句理解 TableResult resultenv.from(ecommerce.marketplace.orders).select($(*)).execute();result.print(); 执行上面的java代码的效果与下面执行sql的效果等效 5. select语句flink table api //查询结果TableResult resultenv.from(ecommerce.marketplace.orders).select($(*),$(vin).as(win)).insertInto(all_cars).execute();CloseableIteratorRow rowsresult.collect();//迭代结果while (rows.hasNext()){Row rowrows.next();row.getField(vin);}env.from(cars).select($(*)).where($(color).isEqual(Blue)).insertInto(blue_cars);env.from(cars).select($(*)).where($(year).isGreaterOrEqual(2022)).insertInto(cars_from_2022_or_later);env.from(cars).select($(*)).where(and($(year).isGreaterOrEqual(1900),$(year).isLess(2000))).insertInto(cars_from_the_1900s);env.from(cars).select($(*)).where($(year).cast(DataTypes.STRING()).like(19%)).insertInto(cars_from_the_1900s); 备注说明flink中查询出来的数据一般很少打印输出一般直接插入一张表 查看定义的表结构 SHOW CREATE TABLE examples.marketplace.orders;查看flink是否正常启动 ps aux | grep flink 6. 使用flink table api 创建table package com.flink.wc;import org.apache.flink.table.api.*;public class FlinkTableApi2 {public static void main(String[] args) {//1.创建设置EnvironmentSettings settingsEnvironmentSettings.newInstance().inStreamingMode().build();//2.创建table环境TableEnvironment env TableEnvironment.create(settings);//创建schemaSchema schemaTestSchema.newBuilder().column(vin, DataTypes.STRING().notNull()).column(make, DataTypes.STRING().notNull()).column(model, DataTypes.STRING().notNull()).column(year,DataTypes.INT().notNull()).column(color,DataTypes.STRING().notNull()).build();//转换上面schema成json或avro或protobuf schemaTableDescriptor descriptorTableDescriptor.forConnector(confluents).schema(schemaTest).option(key.format,proto-registry)//注册使用proto格式,当然也可以使用json-registry或avro-registry.option(value.format,proto-registry).option(kafka.retention.time,7 days).option(scan.startup.mode,latest-offset).partitionedBy(make)//设置分隔的key为make.build();//创建表env.createTable(cars,descriptor);} } 数据格式化的格式 proto-registryjson-registryavro-registry flink 创建表的原理 创建表就是在创建 keyschema、value schema、topic;descriptor:描述如何这些资源被创建 flink table api 使用sql脚本创建表 //创建表方法2env.executeSql(CREATE TABLE cars (vin VARCHAR(2147483647) NOT NULL,make VARCHAR(2147483647) NOT NULL,model VARCHAR(2147483647) NOT NULL,year INT NOT NULL,color VARCHAR(2147483647) NOT NULL) DISTRIBUTED BY (vin) INTO 6 BUCKETS WITH (connector confluent,kafka.retention.time 7 days,scan.startup.mode latest-offset,key.format proto-registry,value.format proto-registry));7. 使用flink table api 写流式数据输出到表或sink 输出到table flink statement 上面的sink即java APP 当javaApp挂了出现的情况只要kafka topic存在可以把数据写到kafka flink table api 数据流写入表也可以是topics并输出结果到控制台: 上面语句解释 执行execute()方法就是在创建一个无界流这也就意味着进行insertInfo操作的时候就一直往表中无限插入。这个特别需要注意不能这样做不然会出现数据爆表 8. flink auto-scaling(自动伸缩) 在flink java应用中查询是一个单线程因此不能自动伸缩 confluent cloud可以让flink自动伸缩更加高效的利用资源 中断job:即使中断了应用无界流依然run,依然耗费资源。可以通过程序中断job: tableresult.getJobClient().get().cancel(); TableResult tableresultenv.from(ecommerce.marketplace.orders).select($(*),$(vin).as(win)).insertInto(all_cars).execute();tableresult.getJobClient().get().cancel();//中断job9. watermarks水印 kafka可以确保分区内有序但不确保跨多个分区有序这也就产生了事件到达无序问题 watermark的目标是提供一个最大的时间等候无序事件 kafka ---- timeststamp 自定义watermark 10. 聚合数据Aggregating Data 语句解释把查询汇总的结果最终写入topic: trips_overview中 flink可以把各个分区sum的结果进行最终的聚合Aggregate; 如果分区不够很容易出现数据偏移最佳实践是配置足够多的分区空间在扩展的时候不会出现太多偏移 groupBy 可以分隔进入的数据流成为多个数据流 distinct()去重 count 、 sum、average 通常是安全的并且可以完成最小的存储; distinct需要更多的存储区间于如何配置它 11. 使用窗口聚合数据(using windows to aggregate data) window API 允许flink对事件进行分组基于时间窗口 滚动窗口tumbling windows: 窗口固定无重叠 把上面的消息分隔成1个小时一个窗口 滚动窗口代码如下 package com.flink.wc;import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.Tumble;import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.lit;public class FlinkTableApiWindowAggregate {public static void main(String[] args) {//1.创建设置EnvironmentSettings settingsEnvironmentSettings.newInstance().inStreamingMode().build();//2.创建table环境TableEnvironment env TableEnvironment.create(settings);env.from(examples.marketpalce.orders).window(Tumble.over(lit(1).hours()//设置一个滚动窗口持续时间为1小时).on($($rowtime)//使用rowtime字段作为时间戳timestamp).as(window)//设置窗口名字为name).groupBy(//对查询记录进行分组$(customer_id),$(window)).select(//查询计算窗口数据$(customer_id),$(window).start().as(window_start),$(window).end().as(window_end),$(price).sum().as(total_spend)).execute();} }fixed-size window(固定大小窗口) 滑动窗口sliding window:窗口与先前的窗口重叠 跳跃窗口(hopping window): 它是一种基于时间的窗口机制用于将持续产生的流数据划分成多个有固定长度的时间段窗口并按固定的 “跳跃间隔”hop interval向前移动。 核心特点窗口有固定的长度window size和固定的跳跃间隔。跳跃间隔通常小于窗口长度因此相邻窗口之间会存在重叠部分。 例如若窗口长度为 10 分钟跳跃间隔为 5 分钟那么第一个窗口覆盖 [0:00, 0:10)第二个窗口覆盖 [0:05, 0:15)以此类推两个窗口重叠 5 分钟。与其他窗口的区别 滚动窗口tumbling window跳跃间隔等于窗口长度无重叠滑动窗口sliding window逻辑上与跳跃窗口类似有时可视为 “跳跃窗口” 的另一种表述取决于具体语境强调窗口随时间滑动的特性。 跳跃窗口常用于需要高频次统计一段连续时间数据的场景例如 “每 5 分钟计算过去 10 分钟的订单总量”。 滑动窗口代码 package com.flink.wc;import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Slide; import org.apache.flink.table.api.TableEnvironment;import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.lit;/*** 滑动窗口代码*/ public class FlinkTableApiWindowAggregate2 {public static void main(String[] args) {//1.创建设置EnvironmentSettings settingsEnvironmentSettings.newInstance().inStreamingMode().build();//2.创建table环境TableEnvironment env TableEnvironment.create(settings);env.from(examples.marketpalce.orders).window(Slide.over(lit(1).hours()//设置一个滑动窗口持续时间为1小时).every(lit(30).minutes()//设置每30分钟计算一个结果).on($($rowtime)//使用rowtime字段作为时间戳timestamp).as(window)//设置窗口名字为name).groupBy(//对查询记录进行分组$(customer_id),$(window)).select(//查询计算窗口数据$(customer_id),$(window).start().as(window_start),$(window).end().as(window_end),$(price).sum().as(total_spend)).execute();} } 12.flink 多表连接joining flink tables 内连接inner join 左连接全连接 interval join区间连接在区间时间内的查询有效一旦超过区间时间就舍弃窗口外无效状态 需要append-only 流以避免级联更新问题 13.flink table api 代码 创建表代码 package com.flink.wc;import org.apache.flink.table.api.*; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.types.DataType;import java.util.List;/*** A table program example that illustrates how to create a table backed by a Kafka topic.** pNOTE: This example requires write access to a Kafka cluster. Fill out the given variables* below with target catalog/database if this is fine for you.*/ public class CreatingTables {// Fill this with an environment you have write access tostatic final String TARGET_CATALOG ;// Fill this with a Kafka cluster you have write access tostatic final String TARGET_DATABASE ;// Fill this with names of the Kafka Topics you want to createstatic final String TARGET_TABLE1 MyExampleTable1;static final String TARGET_TABLE2 MyExampleTable2;public static void main(String[] args) {//1.创建设置EnvironmentSettings settingsEnvironmentSettings.newInstance().inStreamingMode().build();settings.getConfiguration().set(TableConfigOptions.LOCAL_TIME_ZONE,UTC);//table.local-time-zone//2.创建table环境TableEnvironment env TableEnvironment.create(settings);env.useCatalog(TARGET_CATALOG);env.useDatabase(TARGET_DATABASE);System.out.println(Creating table... TARGET_TABLE1);// Create a table programmatically:// The table...// - is backed by an equally named Kafka topic// - stores its payload in JSON// - will reference two Schema Registry subjects for Kafka message key and value// - is distributed across 4 Kafka partitions based on the Kafka message key user_idenv.createTable(TARGET_TABLE1,TableDescriptor.forManaged().schema(Schema.newBuilder().column(user_id, DataTypes.STRING()).column(name, DataTypes.STRING()).column(email, DataTypes.STRING()).build()).partitionedBy(user_id)//.distributedBy(4, user_id).option(kafka.retention.time, 0).option(key.format, json-registry).option(value.format, json-registry).build());// Alternatively, the call above could also be executed with SQLenv.executeSql(CREATE TABLE IF NOT EXISTS TARGET_TABLE1 (\n user_id STRING,\n name STRING,\n email STRING\n )\n DISTRIBUTED BY HASH(user_id) INTO 4 BUCKETS\n WITH (\n kafka.retention.time 0 ms,\n key.format json-registry,\n value.format json-registry\n ));System.out.println(Creating table... TARGET_TABLE2);// The schema builders can be quite useful to avoid manual schema work. You can adopt schema// from other tables, massage the schema, and/or add additional columnsDataType productsRow env.from(examples.marketplace.products).getResolvedSchema().toPhysicalRowDataType();ListString columnNames DataType.getFieldNames(productsRow);ListDataType columnTypes DataType.getFieldDataTypes(productsRow);// In this example, the table will get all names/data types from the table products// plus an additionalColumn columnenv.createTable(TARGET_TABLE2,TableDescriptor.forManaged().schema(Schema.newBuilder().fromFields(columnNames, columnTypes).column(additionalColumn, DataTypes.STRING()).build()).build());} } flink table pipeline(flink 流水线)代码 package com.flink.wc;import org.apache.flink.table.api.*; import org.apache.flink.table.api.config.TableConfigOptions;import java.util.List; import java.util.concurrent.ExecutionException;import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.concat; import static org.apache.flink.table.api.Expressions.row;/*** A table program example that demos how to pipe data into a table or multiple tables.** pNOTE: This example requires write access to a Kafka cluster. Fill out the given variables* below with target catalog/database if this is fine for you.** pALSO NOTE: The example submits an unbounded background statement. Make sure to stop the* statement in the Web UI afterward to clean up resources.*/ public class Example_05_TablePipelines {// Fill this with an environment you have write access tostatic final String TARGET_CATALOG ;// Fill this with a Kafka cluster you have write access tostatic final String TARGET_DATABASE ;// Fill this with names of the Kafka Topics you want to createstatic final String TARGET_TABLE1 PricePerProduct;static final String TARGET_TABLE2 PricePerCustomer;public static void main(String[] args) throws ExecutionException, InterruptedException {//1.创建设置EnvironmentSettings settingsEnvironmentSettings.newInstance().inStreamingMode().build();settings.getConfiguration().set(TableConfigOptions.LOCAL_TIME_ZONE,UTC);//table.local-time-zone//2.创建table环境TableEnvironment env TableEnvironment.create(settings);env.useCatalog(TARGET_CATALOG);env.useDatabase(TARGET_DATABASE);System.out.println(Creating tables... List.of(TARGET_TABLE1, TARGET_TABLE2));// Create two helper tables that will be filled with data from examplesenv.createTable(TARGET_TABLE1,TableDescriptor.forManaged().schema(Schema.newBuilder().column(product_id, DataTypes.STRING().notNull()).column(price, DataTypes.DOUBLE().notNull()).build()).build());env.createTable(TARGET_TABLE2,TableDescriptor.forManaged().schema(Schema.newBuilder().column(customer_id, DataTypes.INT().notNull()).column(price, DataTypes.DOUBLE().notNull()).build()).build());System.out.println(Executing table pipeline synchronous...);// A TablePipeline describes a flow of data from source(s) to sink.// In this case, from values to a Kafka-backed target tableTablePipeline pipeline env.fromValues(row(1408, 27.71), row(1062, 94.39), row(42, 80.01)).insertInto(TARGET_TABLE1);// One can explain or execute a pipelinepipeline.printExplain();// Execution happens async by default, use await() to attach to the execution in case all// sources are finite (i.e. bounded).// For infinite (i.e. unbounded) sources, waiting for completion would not make much sense.pipeline.execute().await();System.out.println(Executing statement set asynchronous...);// The API supports more than a single sink, you can also fan out to different tables while// reading from a table once using a StatementSet:StatementSet statementSet env.createStatementSet().add(env.from(examples.marketplace.orders).select($(product_id), $(price)).insertInto(TARGET_TABLE1)).add(env.from(examples.marketplace.orders).select($(customer_id), $(price)).insertInto(TARGET_TABLE2));// Executes a statement set that splits the orders table into two tables,// a product_id | price table and a customer_id | price onestatementSet.execute();System.out.println(Reading merged data written by background statement...);// For this example, we read both target tables in again and union them into one output to// verify that the data arrivesTable targetTable1 env.from(TARGET_TABLE1).select(concat($(product_id), event in , TARGET_TABLE1));Table targetTable2 env.from(TARGET_TABLE2).select(concat($(customer_id).cast(DataTypes.STRING()), event in ,TARGET_TABLE2));targetTable1.unionAll(targetTable2).as(status).execute().print();} } flink values与datatype : package com.flink.wc;import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.expressions.Expression; import org.apache.flink.types.Row;import java.math.BigDecimal; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.util.Map;import static org.apache.flink.table.api.Expressions.*;/** A table program example to create mock data. */ public class Example_06_ValuesAndDataTypes {public static void main(String[] args) {//1.创建设置EnvironmentSettings settingsEnvironmentSettings.newInstance().inStreamingMode().build();settings.getConfiguration().set(TableConfigOptions.LOCAL_TIME_ZONE,UTC);//table.local-time-zone//2.创建table环境TableEnvironment env TableEnvironment.create(settings);// Values for each data type can be created...// (1) with Java objectsRow row new Row(17);// BOOLEANrow.setField(0, true);// STRING / CHAR / VARCHARrow.setField(1, Alice);// DATErow.setField(2, LocalDate.of(2024, 12, 23));// TIMErow.setField(3, LocalTime.of(13, 45, 59));// TIMESTAMProw.setField(4, LocalDateTime.of(2024, 12, 23, 13, 45, 59));// TIMESTAMP_LTZrow.setField(5, Instant.ofEpochMilli(1734957959000L));// BIGINTrow.setField(6, 42L);// INTrow.setField(7, 42);// SMALLINTrow.setField(8, (short) 42);// TINYINTrow.setField(9, (byte) 42);// DOUBLErow.setField(10, 42.0);// FLOATrow.setField(11, 42.0f);// DECIMALrow.setField(12, new BigDecimal(123.4567));// BYTES / BINARY / VARBINARYrow.setField(13, new byte[] {1, 2, 3});// ARRAYrow.setField(14, new Integer[] {1, 2, 3});// MAProw.setField(15, Map.ofEntries(Map.entry(k1, v1), Map.entry(k2, v2)));// ROWrow.setField(16, Row.of(Bob, true));Table fromObjects env.fromValues(row);// (2) with Table API expressionsExpression rowExpr row(// VARCHAR(200)lit(Alice).cast(DataTypes.VARCHAR(200)),// ARRAYarray(1, 2, 3),// MAPmap(k1, v1, k2, v2),// ROWrow(Bob, true),// NULLnullOf(DataTypes.INT()));Table fromExpressions env.fromValues(rowExpr);// (3) with SQL expressionsTable fromSql env.sqlQuery(VALUES (// VARCHAR(200) CAST(Alice AS VARCHAR(200)), // BYTES x010203, // ARRAY ARRAY[1, 2, 3], // MAP MAP[k1, v1, k2, v2, k3, v3], // ROW (Bob, true), // NULL CAST(NULL AS INT), // DATE DATE 2024-12-23, // TIME TIME 13:45:59.000, // TIMESTAMP TIMESTAMP 2024-12-23 13:45:59.000, // TIMESTAMP_LTZ TO_TIMESTAMP_LTZ(1734957959000, 3) ));// Verify the derived data types and valuesSystem.out.println(Table from objects:);fromObjects.printSchema();fromObjects.execute().print();System.out.println(Table from Table API expressions:);fromExpressions.printSchema();fromExpressions.execute().print();System.out.println(Table from SQL expressions:);fromSql.printSchema();fromSql.execute().print();} } flink confluent集成与部署代码 package com.flink.wc;import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator;import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream;import static org.apache.flink.table.api.Expressions.*;/*** An example that illustrates how to embed a table program into a CI/CD pipeline for continuous* testing and rollout.** pBecause we cannot rely on production data in this example, the program sets up some* Kafka-backed tables with data during the {code setup} phase.** pAfterward, the program can operate in two modes: one for integration testing ({code test}* phase) and one for deployment ({code deploy} phase).** pA CI/CD workflow could execute the following:** pre* export EXAMPLE_JAR./target/flink-table-api-java-examples-1.0.jar* export EXAMPLE_CLASSio.confluent.flink.examples.table.Example_08_IntegrationAndDeployment* java -jar $EXAMPLE_JAR $EXAMPLE_CLASS setup* java -jar $EXAMPLE_JAR $EXAMPLE_CLASS test* java -jar $EXAMPLE_JAR $EXAMPLE_CLASS deploy* /pre** pNOTE: This example requires write access to a Kafka cluster. Fill out the given variables* below with target catalog/database if this is fine for you.** pALSO NOTE: The example submits an unbounded background statement. Make sure to stop the* statement in the Web UI afterward to clean up resources.** pThe complete CI/CD workflow performs the following steps:** ol* liCreate Kafka table ProductsMock and VendorsPerBrand.* liFill Kafka table ProductsMock with data from marketplace examples table products.* liTest the given SQL on a subset of data in ProductsMock with the help of dynamic options.* liDeploy an unbounded version of the tested SQL that write into VendorsPerBrand.* /ol*/ public class Example_08_IntegrationAndDeployment {// Fill this with an environment you have write access tostatic final String TARGET_CATALOG ;// Fill this with a Kafka cluster you have write access tostatic final String TARGET_DATABASE ;// Fill this with names of the Kafka Topics you want to createstatic final String SOURCE_TABLE ProductsMock;static final String TARGET_TABLE VendorsPerBrand;// The following SQL will be tested on a finite subset of data before// it gets deployed to production.// In production, it will run on unbounded input.// The %s parameterizes the SQL for testing.static final String SQL SELECT brand, COUNT(*) AS vendors FROM ProductsMock %s GROUP BY brand;public static void main(String[] args) throws Exception {if (args.length 0) {throw new IllegalArgumentException(No mode specified. Possible values are setup, test, or deploy.);}//1.创建设置EnvironmentSettings settingsEnvironmentSettings.newInstance().inStreamingMode().build();settings.getConfiguration().set(TableConfigOptions.LOCAL_TIME_ZONE,UTC);//table.local-time-zone//2.创建table环境TableEnvironment env TableEnvironment.create(settings);env.useCatalog(TARGET_CATALOG);env.useDatabase(TARGET_DATABASE);String mode args[0];switch (mode) {case setup:setupProgram(env);break;case test:testProgram(env);break;case deploy:deployProgram(env);break;default:throw new IllegalArgumentException(Unknown mode: mode);}}// --------------------------------------------------------------------------------------------// Setup Phase// --------------------------------------------------------------------------------------------private static void setupProgram(TableEnvironment env) throws Exception {System.out.println(Running setup...);System.out.println(Creating table... SOURCE_TABLE);// Create a mock table that has exactly the same schema as the example products table.// The LIKE clause is very convenient for this task which is why we use SQL here.// Since we use little data, a bucket of 1 is important to satisfy the scan.bounded.mode// during testing.env.executeSql(String.format(CREATE TABLE IF NOT EXISTS %s\n DISTRIBUTED INTO 1 BUCKETS\n LIKE examples.marketplace.products (EXCLUDING OPTIONS),SOURCE_TABLE));System.out.println(Start filling table...);// Let Flink copy generated data into the mock table. Note that the statement is unbounded// and submitted as a background statement by default.TableResult pipelineResult env.from(examples.marketplace.products).select($(*)).insertInto(SOURCE_TABLE).execute();System.out.println(Waiting for at least 200 elements in table...);// We start a second Flink statement for monitoring how the copying progressesTableResult countResult env.from(SOURCE_TABLE).select(lit(1).count()).as(c).execute();// This waits for the condition to be met:try (CloseableIteratorRow iterator countResult.collect()) {while (iterator.hasNext()) {Row row iterator.next();long count row.getFieldAs(c);if (count 200L) {System.out.println(200 elements reached. Stopping...);break;}}}// By using a closable iterator, the foreground statement will be stopped automatically when// the iterator is closed. But the background statement still needs a manual stop.ConfluentTools.stopStatement(pipelineResult);//countResult.getJobClient().get().cancel()System.out.println(Creating table... TARGET_TABLE);// Create a table for storing the results after deployment.env.executeSql(String.format(CREATE TABLE IF NOT EXISTS %s \n (brand STRING, vendors BIGINT, PRIMARY KEY(brand) NOT ENFORCED)\n DISTRIBUTED INTO 1 BUCKETS,TARGET_TABLE));}// --------------------------------------------------------------------------------------------// Test Phase// --------------------------------------------------------------------------------------------private static void testProgram(TableEnvironment env) {System.out.println(Running test...);// Dynamic options allow influencing parts of a table scan. In this case, they define a// range (from start offset 0 to end offset 100) how to read from Kafka. Effectively,// they make the table bounded. If all tables are finite, the statement can terminate.// This allows us to run checks on the result.String dynamicOptions /* OPTIONS(\n scan.startup.mode specific-offsets,\n scan.startup.specific-offsets partition: 0, offset: 0,\n scan.bounded.mode specific-offsets,\n scan.bounded.specific-offsets partition: 0, offset: 100\n ) */;System.out.println(Requesting test data...);TableResult result env.executeSql(String.format(SQL, dynamicOptions));IteratorRow rows result.collect();ListRow listsnew ArrayList();while (rows.hasNext()){Row rowrows.next();lists.add(row);}System.out.println(Test data:\n lists.stream().map(Row::toString).collect(Collectors.joining(\n)));// Use the testing framework of your choice and add checks to verify the// correctness of the test databoolean testSuccessful lists.stream().map(r - r.StringgetFieldAs(brand)).anyMatch(brand - brand.equals(Apple));if (testSuccessful) {System.out.println(Success. Ready for deployment.);} else {throw new IllegalStateException(Test was not successful);}}// --------------------------------------------------------------------------------------------// Deploy Phase// --------------------------------------------------------------------------------------------private static void deployProgram(TableEnvironment env) {System.out.println(Running deploy...);// It is possible to give a better statement name for deployment but make sure that the name// is unique across environment and region.String statementName vendors-per-brand- UUID.randomUUID();env.getConfig().set(client.statement-name, statementName);// Execute the SQL without dynamic options.// The result is unbounded and piped into the target table.TableResult insertIntoResult env.sqlQuery(String.format(SQL, )).insertInto(TARGET_TABLE).execute();// The API might add suffixes to manual statement names such as -sql or -api.// For the final submitted name, use the provided tools.String finalName ConfluentTools.getStatementName(insertIntoResult);System.out.println(Statement has been deployed as: finalName);} }
http://www.zqtcl.cn/news/762541/

相关文章:

  • 无锡网站制作排名软件工程公司
  • 做网站国内好的服务器美食网站建设项目规划书
  • 三亚市住房和城乡建设厅网站江西电信网站备案
  • 联谊会总结网站建设对外宣传如何在家做电商
  • 360建站系统徐州建设银行网上银行个人网站
  • 网站域名在哪里备案石家庄站规模
  • 重庆南川网站制作公司电话工会网站群建设
  • 深圳高端建设网站忘了网站链接怎么做
  • 郑州做网站报价wordpress中文4.8
  • 网站维护费用一年多少跨境电商平台网站建设广州
  • 辽宁网站制作公司网店装修流程
  • html5可以做交互网站吗打开网站说建设中是什么问题?
  • 彩票网站开发制作需要什么wordpress 在线预览
  • 外贸平台app衡水seo排名
  • 怎样做网站表白墙东莞商城网站推广建设
  • 郑州郑州网站建设河南做网站公司哪家好爱站长尾词挖掘工具
  • dede网站地图文章变量网站qq 微信分享怎么做
  • 越南做网站网站建设以及运营方面
  • 广西建网站哪家好网站关闭与域名备案
  • 网站开发版本号婚庆网站建设策划案费用预算
  • 厦门建设网站制作中山市哪家公司做网站
  • 网站路径wordpress制作电商网站
  • 江西网站开发哪家专业装饰设计公司网站
  • 企业网站策划实训Wordpress 主题简化
  • 做网站点击挣钱不兰州工程建设信息网站
  • 网站说服力 营销...免费看片网站
  • 深圳招聘网站大全制作网站软件下载
  • 网站建设说明哈尔滨网站建设渠道
  • 一 网站建设管理基本情况设计类的网站
  • wordpress产品编辑如何优化wordpress