当前位置: 首页 > news >正文

哪里有网站建设中心页面设计布局

哪里有网站建设中心,页面设计布局,莱芜网站建设服务,网站降权怎么恢复Flink SQL 来源#xff1a;B站尚硅谷 sql-client准备 Table API和SQL是最上层的API#xff0c;在Flink中这两种API被集成在一起#xff0c;SQL执行的对象也是Flink中的表#xff08;Table#xff09;#xff0c;所以我们一般会认为它们是一体的。Flink是批流统一的处理…Flink SQL 来源B站尚硅谷 sql-client准备 Table API和SQL是最上层的API在Flink中这两种API被集成在一起SQL执行的对象也是Flink中的表Table所以我们一般会认为它们是一体的。Flink是批流统一的处理框架无论是批处理DataSet API还是流处理DataStream API在上层应用中都可以直接使用Table API或者SQL来实现这两种API对于一张表执行相同的查询操作得到的结果是完全一样的。 SQL API 是基于 SQL 标准的 Apache Calcite 框架实现的可通过纯 SQL 来开发和运行一个Flink 任务。具体的API调用可以随时关注官网的更新变化。 基于yarn-session模式 1启动Flink /opt/module/flink-1.17.0/bin/yarn-session.sh -d2启动Flink的sql-client /opt/module/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session常用配置 1结果显示模式 #默认table还可以设置为tableau、changelog SET sql-client.execution.result-modetableau;3执行环境 SET execution.runtime-modestreaming; #默认streaming也可以设置batch4默认并行度 SET parallelism.default1;5设置状态TTL SET table.exec.state.ttl1000;6通过sql文件初始化 1创建sql文件 vim conf/sql-client-init.sqlSET sql-client.execution.result-modetableau; CREATE DATABASE mydatabase;2启动时指定sql文件 /opt/module/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session -i conf/sql-client-init.sql流处理中的表 我们可以将关系型表/SQL与流处理做一个对比如表所示。 关系型表/SQL流处理处理的数据对象字段元组的有界集合字段元组的无限序列查询Query对数据的访问可以访问到完整的数据输入无法访问到所有数据必须“持续”等待流式输入查询终止条件生成固定大小的结果集后终止永不停止根据持续收到的数据不断更新查询结果 可以看到其实关系型表和SQL主要就是针对批处理设计的这和流处理有着天生的隔阂。 动态表和持续查询 流处理面对的数据是连续不断的这导致了流处理中的“表”跟我们熟悉的关系型数据库中的表完全不同而基于表执行的查询操作也就有了新的含义。 1动态表Dynamic Tables 当流中有新数据到来初始的表中会插入一行而基于这个表定义的SQL查询就应该在之前的基础上更新结果。这样得到的表就会不断地动态变化被称为“动态表”Dynamic Tables。 动态表是Flink在Table API和SQL中的核心概念它为流数据处理提供了表和SQL支持。我们所熟悉的表一般用来做批处理面向的是固定的数据集可以认为是“静态表”而动态表则完全不同它里面的数据会随时间变化。2持续查询Continuous Query 动态表可以像静态的批处理表一样进行查询操作。由于数据在不断变化因此基于它定义的SQL查询也不可能执行一次就得到最终结果。这样一来我们对动态表的查询也就永远不会停止一直在随着新数据的到来而继续执行。这样的查询就被称作“持续查询”Continuous Query。对动态表定义的查询操作都是持续查询而持续查询的结果也会是一个动态表。 由于每次数据到来都会触发查询操作因此可以认为一次查询面对的数据集就是当前输入动态表中收到的所有数据。这相当于是对输入动态表做了一个“快照”snapshot当作有限数据集进行批处理流式数据的到来会触发连续不断的快照查询像动画一样连贯起来就构成了“持续查询”。 持续查询的步骤如下 1流stream被转换为动态表dynamic table 2对动态表进行持续查询continuous query生成新的动态表 3生成的动态表被转换成流。 这样只要API将流和动态表的转换封装起来我们就可以直接在数据流上执行SQL查询用处理表的方式来做流处理了。 将流转换成动态表 如果把流看作一张表那么流中每个数据的到来都应该看作是对表的一次插入Insert操作会在表的末尾添加一行数据。因为流是连续不断的而且之前的输出结果无法改变、只能在后面追加所以我们其实是通过一个只有插入操作insert-only的更新日志changelog流来构建一个表。 例如当用户点击事件到来时就对应着动态表中的一次插入Insert操作每条数据就是表中的一行随着插入更多的点击事件得到的动态表将不断增长。 用SQL持续查询 1更新Update查询 我们在代码中定义了一个SQL查询。 Table urlCountTable tableEnv.sqlQuery(SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user);当原始动态表不停地插入新的数据时查询得到的urlCountTable会持续地进行更改。由于count数量可能会叠加增长因此这里的更改操作可以是简单的插入Insert也可以是对之前数据的更新Update。这种持续查询被称为更新查询Update Query更新查询得到的结果表如果想要转换成DataStream必须调用toChangelogStream()方法。 2追加Append查询 上面的例子中查询过程用到了分组聚合结果表中就会产生更新操作。如果我们执行一个简单的条件查询结果表中就会像原始表EventTable一样只有插入Insert操作了。 Table aliceVisitTable tableEnv.sqlQuery(SELECT url, user FROM EventTable WHERE user Cary);这样的持续查询就被称为追加查询Append Query它定义的结果表的更新日志changelog流中只有INSERT操作。 由于窗口的统计结果是一次性写入结果表的所以结果表的更新日志流中只会包含插入INSERT操作而没有更新UPDATE操作。所以这里的持续查询依然是一个追加Append查询。结果表result如果转换成DataStream可以直接调用toDataStream()方法。 将动态表转换为流 与关系型数据库中的表一样动态表也可以通过插入Insert、更新Update和删除Delete操作进行持续的更改。将动态表转换为流或将其写入外部系统时就需要对这些更改操作进行编码通过发送编码消息的方式告诉外部系统要执行的操作。在Flink中Table API和SQL支持三种编码方式 仅追加Append-only流 仅通过插入Insert更改来修改的动态表可以直接转换为“仅追加”流。这个流中发出的数据其实就是动态表中新增的每一行。撤回Retract流 撤回流是包含两类消息的流添加add消息和撤回retract消息。 具体的编码规则是INSERT插入操作编码为add消息DELETE删除操作编码为retract消息而UPDATE更新操作则编码为被更改行的retract消息和更新后行新行的add消息。这样我们可以通过编码后的消息指明所有的增删改操作一个动态表就可以转换为撤回流了。 更新插入Upsert流 更新插入流中只包含两种类型的消息更新插入upsert消息和删除delete消息。 所谓的“upsert”其实是“update”和“insert”的合成词所以对于更新插入流来说INSERT插入操作和UPDATE更新操作统一被编码为upsert消息而DELETE删除操作则被编码为delete消息。 需要注意的是在代码里将动态表转换为DataStream时只支持仅追加append-only和撤回retract流我们调用toChangelogStream()得到的其实就是撤回流。而连接到外部系统时则可以支持不同的编码方法这取决于外部系统本身的特性。 时间属性 在Table API和SQL中会给表单独提供一个逻辑上的时间字段专门用来在表处理程序中指示时间。 所以所谓的时间属性time attributes其实就是每个表模式结构schema的一部分。它可以在创建表的DDL里直接定义为一个字段也可以在DataStream转换成表时定义。一旦定义了时间属性它就可以作为一个普通字段引用并且可以在基于时间的操作中使用。 时间属性的数据类型必须为TIMESTAMP它的行为类似于常规时间戳可以直接访问并且进行计算。 按照时间语义的不同可以把时间属性的定义分成事件时间event time和处理时间processing time两种情况。 事件时间 事件时间属性可以在创建表DDL中定义增加一个字段通过WATERMARK语句来定义事件时间属性。具体定义方式如下 CREATE TABLE EventTable(user STRING,url STRING,ts TIMESTAMP(3),WATERMARK FOR ts AS ts - INTERVAL 5 SECOND ) WITH (... );这里我们把ts字段定义为事件时间属性而且基于ts设置了5秒的水位线延迟。 时间戳类型必须是 TIMESTAMP 或者TIMESTAMP_LTZ 类型。但是时间戳一般都是秒或者是毫秒BIGINT 类型这种情况可以通过如下方式转换 ts BIGINT, time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),处理时间 在定义处理时间属性时必须要额外声明一个字段专门用来保存当前的处理时间。 在创建表的DDLCREATE TABLE语句中可以增加一个额外的字段通过调用系统内置的PROCTIME()函数来指定当前的处理时间属性。 CREATE TABLE EventTable(user STRING,url STRING,ts AS PROCTIME() ) WITH (... );DDLData Definition Language数据定义 数据库 1创建数据库 1语法 CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name[COMMENT database_comment]WITH (key1val1, key2val2, ...)2案例 CREATE DATABASE db_flink;2查询数据库 1查询所有数据库 SHOW DATABASES2查询当前数据库 SHOW CURRENT DATABASE3修改数据库 ALTER DATABASE [catalog_name.]db_name SET (key1val1, key2val2, ...)4删除数据库 DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]RESTRICT删除非空数据库会触发异常。默认启用 CASCADE删除非空数据库也会删除所有相关的表和函数。 DROP DATABASE db_flink2;5切换当前数据库 USE database_name;表 1创建表 1语法 CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name({ physical_column_definition | metadata_column_definition | computed_column_definition }[ , ...n][ watermark_definition ][ table_constraint ][ , ...n])[COMMENT table_comment][PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]WITH (key1val1, key2val2, ...)[ LIKE source_table [( like_options )] | AS select_query ]① physical_column_definition 物理列是数据库中所说的常规列。其定义了物理介质中存储的数据中字段的名称、类型和顺序。其他类型的列可以在物理列之间声明但不会影响最终的物理列的读取。 ② metadata_column_definition 元数据列是 SQL 标准的扩展允许访问数据源本身具有的一些元数据。元数据列由 METADATA 关键字标识。例如我们可以使用元数据列从Kafka记录中读取和写入时间戳用于基于时间的操作这个时间戳不是数据中的某个时间戳字段而是数据写入 Kafka 时Kafka 引擎给这条数据打上的时间戳标记。connector和format文档列出了每个组件可用的元数据字段。 CREATE TABLE MyTable (user_id BIGINT,name STRING,record_time TIMESTAMP_LTZ(3) METADATA FROM timestamp ) WITH (connector kafka... );如果自定义的列名称和 Connector 中定义 metadata 字段的名称一样 FROM xxx 子句可省略 CREATE TABLE MyTable ( user_id BIGINT, name STRING, timestamp TIMESTAMP_LTZ(3) METADATA ) WITH ( connector kafka ... );如果自定义列的数据类型和 Connector 中定义的 metadata 字段的数据类型不一致程序运行时会自动 cast强转但是这要求两种数据类型是可以强转的。 CREATE TABLE MyTable ( user_id BIGINT, name STRING, -- 将时间戳强转为 BIGINT timestamp BIGINT METADATA ) WITH ( connector kafka ... );默认情况下Flink SQL planner 认为 metadata 列可以读取和写入。然而在许多情况下外部系统提供的只读元数据字段比可写字段多。因此可以使用VIRTUAL关键字排除元数据列的持久化(表示只读)。 CREATE TABLE MyTable (timestamp BIGINT METADATA, offset BIGINT METADATA VIRTUAL,user_id BIGINT,name STRING, ) WITH (connector kafka... );③ computed_column_definition 计算列是使用语法column_name AS computed_column_expression生成的虚拟列。 计算列就是拿已有的一些列经过一些自定义的运算生成的新列在物理上并不存储在表中只能读不能写。列的数据类型从给定的表达式自动派生无需手动声明。 CREATE TABLE MyTable (user_id BIGINT,price DOUBLE,quantity DOUBLE,cost AS price * quanitity ) WITH (connector kafka... );④ 定义Watermark Flink SQL 提供了几种 WATERMARK 生产策略 严格升序WATERMARK FOR rowtime_column AS rowtime_column。 Flink 任务认为时间戳只会越来越大也不存在相等的情况只要相等或者小于之前的就认为是迟到的数据。递增WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND 。 一般基本不用这种方式。如果设置此类则允许有相同的时间戳出现。有界无序 WATERMARK FOR rowtime_column AS rowtime_column – INTERVAL ‘string’ timeUnit 。 此类策略就可以用于设置最大乱序时间假如设置为 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘5’ SECOND 则生成的是运行 5s 延迟的Watermark。一般都用这种 Watermark 生成策略此类 Watermark 生成策略通常用于有数据乱序的场景中而对应到实际的场景中数据都是会存在乱序的所以基本都使用此类策略。 ⑤ PRIMARY KEY 主键约束表明表中的一列或一组列是唯一的并且它们不包含NULL值。主键唯一地标识表中的一行只支持 not enforced。 CREATE TABLE MyTable ( user_id BIGINT, name STRING, PARYMARY KEY(user_id) not enforced ) WITH ( connector kafka ... );⑥ PARTITIONED BY 创建分区表 ⑦ with语句 用于创建表的表属性用于指定外部存储系统的元数据信息。配置属性时表达式key1val1的键和值都应该是字符串字面值。如下是Kafka的映射表 CREATE TABLE KafkaTable ( user_id BIGINT, name STRING, ts TIMESTAMP(3) METADATA FROM timestamp ) WITH ( connector kafka, topic user_behavior, properties.bootstrap.servers localhost:9092, properties.group.id testGroup, scan.startup.mode earliest-offset, format csv )一般 with 中的配置项由 Flink SQL 的 Connector链接外部存储的连接器 来定义每种 Connector 提供的with 配置项都是不同的。 ⑧ LIKE 用于基于现有表的定义创建表。此外用户可以扩展原始表或排除表的某些部分。 可以使用该子句重用(可能还会覆盖)某些连接器属性或者向外部定义的表添加水印。 CREATE TABLE Orders (user BIGINT,product STRING,order_time TIMESTAMP(3) ) WITH ( connector kafka,scan.startup.mode earliest-offset );CREATE TABLE Orders_with_watermark (-- Add watermark definitionWATERMARK FOR order_time AS order_time - INTERVAL 5 SECOND ) WITH (-- Overwrite the startup-modescan.startup.mode latest-offset ) LIKE Orders;⑨ AS select_statementCTAS 在一个create-table-as-select (CTAS)语句中还可以通过查询的结果创建和填充表。CTAS是使用单个命令创建数据并向表中插入数据的最简单、最快速的方法。 CREATE TABLE my_ctas_table WITH (connector kafka,... ) AS SELECT id, name, age FROM source_table WHERE mod(id, 10) 0;注意:CTAS有以下限制: 暂不支持创建临时表。目前还不支持指定显式列。还不支持指定显式水印。目前还不支持创建分区表。目前还不支持指定主键约束。 2简单建表示例 CREATE TABLE test(id INT, ts BIGINT, vc INT ) WITH ( connector print );CREATE TABLE test1 (value STRING ) LIKE test;2查看表 1查看所有表 SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE sql_like_pattern ]如果没有指定数据库则从当前数据库返回表。 LIKE子句中sql pattern的语法与MySQL方言的语法相同: %匹配任意数量的字符甚至零字符%匹配一个’%字符。_只匹配一个字符_只匹配一个’_字符 2查看表信息 { DESCRIBE | DESC } [catalog_name.][db_name.]table_name3修改表 1修改表名 ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name2修改表属性 ALTER TABLE [catalog_name.][db_name.]table_name SET (key1val1, key2val2, ...)4删除表 DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name查询 DataGen Print 1创建数据生成器源表 CREATE TABLE source ( id INT, ts BIGINT, vc INT ) WITH ( connector datagen, rows-per-second1, fields.id.kindrandom, fields.id.min1, fields.id.max10, fields.ts.kindsequence, fields.ts.start1, fields.ts.end1000000, fields.vc.kindrandom, fields.vc.min1, fields.vc.max100 );CREATE TABLE sink (id INT, ts BIGINT, vc INT ) WITH ( connector print );2查询源表 select * from source3插入sink表并查询 INSERT INTO sink select * from source; select * from sink;With子句 WITH提供了一种编写辅助语句的方法以便在较大的查询中使用。这些语句通常被称为公共表表达式(Common Table Expression, CTE)可以认为它们定义了仅为一个查询而存在的临时视图。 1语法 WITH with_item_definition [ , ... ] SELECT ... FROM ...;with_item_defintion:with_item_name (column_name[, ...n]) AS ( select_query )2案例 WITH source_with_total AS (SELECT id, vc10 AS totalFROM source )SELECT id, SUM(total) FROM source_with_total GROUP BY id;SELECT WHERE 子句 1语法 SELECT select_list FROM table_expression [ WHERE boolean_expression ]2案例 SELECT * FROM source SELECT id, vc 10 FROM source-- 自定义 Source 的数据 SELECT id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price)SELECT vc 10 FROM source WHERE id 10SELECT DISTINCT 子句 用作根据 key 进行数据去重 SELECT DISTINCT vc FROM source对于流查询计算查询结果所需的状态可能无限增长。状态大小取决于不同行数。可以设置适当的状态生存时间(TTL)的查询配置以防止状态过大。但是这可能会影响查询结果的正确性。如某个 key 的数据过期从状态中删除了那么下次再来这么一个 key由于在状态中找不到就又会输出一遍。 分组聚合 SQL中一般所说的聚合我们都很熟悉主要是通过内置的一些聚合函数来实现的比如SUM()、MAX()、MIN()、AVG()以及COUNT()。它们的特点是对多条输入数据进行计算得到一个唯一的值属于“多对一”的转换。比如我们可以通过下面的代码计算输入数据的个数 select COUNT(*) from source;而更多的情况下我们可以通过GROUP BY子句来指定分组的键key从而对数据按照某个字段做一个分组统计。 SELECT vc, COUNT(*) as cnt FROM source GROUP BY vc;这种聚合方式就叫作“分组聚合”group aggregation。想要将结果表转换成流或输出到外部系统必须采用撤回流retract stream或更新插入流upsert stream的编码方式如果在代码中直接转换成DataStream打印输出需要调用toChangelogStream()。 分组聚合既是SQL原生的聚合查询也是流处理中的聚合操作这是实际应用中最常见的聚合方式。当然使用的聚合函数一般都是系统内置的如果希望实现特殊需求也可以进行自定义。 1group聚合案例 CREATE TABLE source1 ( dim STRING, user_id BIGINT, price BIGINT, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), WATERMARK FOR row_time AS row_time - INTERVAL 5 SECOND ) WITH ( connector datagen, rows-per-second 10, fields.dim.length 1, fields.user_id.min 1, fields.user_id.max 100000, fields.price.min 1, fields.price.max 100000 );CREATE TABLE sink1 ( dim STRING, pv BIGINT, sum_price BIGINT, max_price BIGINT, min_price BIGINT, uv BIGINT, window_start bigint ) WITH ( connector print );insert into sink1 select dim, count(*) as pv, sum(price) as sum_price, max(price) as max_price, min(price) as min_price, -- 计算 uv 数 count(distinct user_id) as uv, cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start from source1 group by dim, -- UNIX_TIMESTAMP得到秒的时间戳将秒级别时间戳 / 60 转化为 1min cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)2多维分析 Group 聚合也支持 Grouping sets 、Rollup 、Cube如下案例是Grouping sets SELECTsupplier_id , rating , product_id , COUNT(*) FROM ( VALUES(supplier1, product1, 4),(supplier1, product2, 3),(supplier2, product3, 3),(supplier2, product4, 4) ) -- 供应商id、产品id、评级 AS Products(supplier_id, product_id, rating) GROUP BY GROUPING SETS((supplier_id, product_id, rating),(supplier_id, product_id),(supplier_id, rating),(supplier_id),(product_id, rating),(product_id),(rating),() );分组窗口聚合 从1.13版本开始分组窗口聚合已经标记为过时鼓励使用更强大、更有效的窗口TVF聚合在这里简单做个介绍。 直接把窗口自身作为分组key放在GROUP BY之后的所以也叫“分组窗口聚合”。SQL查询的分组窗口是通过 GROUP BY 子句定义的。类似于使用常规 GROUP BY 语句的查询窗口分组语句的 GROUP BY 子句中带有一个窗口函数为每个分组计算出一个结果。 SQL中只支持基于时间的窗口不支持基于元素个数的窗口。 分组窗口函数描述TUMBLE(time_attr, interval)定义一个滚动窗口。滚动窗口把行分配到有固定持续时间 interval 的不重叠的连续窗口。比如5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间批处理、流处理或处理时间流处理上。HOP(time_attr, interval, interval)定义一个跳跃的时间窗口在 Table API 中称为滑动窗口。滑动窗口有一个固定的持续时间 第二个 interval 参数 以及一个滑动的间隔第一个 interval 参数 。若滑动间隔小于窗口的持续时间滑动窗口则会出现重叠因此行将会被分配到多个窗口中。比如一个大小为 15 分组的滑动窗口其滑动间隔为 5 分钟将会把每一行数据分配到 3 个 15 分钟的窗口中。滑动窗口可以定义在事件时间批处理、流处理或处理时间流处理上。SESSION(time_attr, interval)定义一个会话时间窗口。会话时间窗口没有一个固定的持续时间但是它们的边界会根据 interval 所定义的不活跃时间所确定即一个会话时间窗口在定义的间隔时间内没有时间出现该窗口会被关闭。例如时间窗口的间隔时间是 30 分钟当其不活跃的时间达到30分钟后若观测到新的记录则会启动一个新的会话时间窗口否则该行数据会被添加到当前的窗口且若在 30 分钟内没有观测到新纪录这个窗口将会被关闭。会话时间窗口可以使用事件时间批处理、流处理或处理时间流处理。 1准备数据 CREATE TABLE ws (id INT,vc INT,pt AS PROCTIME(), --处理时间et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间WATERMARK FOR et AS et - INTERVAL 5 SECOND --watermark ) WITH (connector datagen,rows-per-second 10,fields.id.min 1,fields.id.max 3,fields.vc.min 1,fields.vc.max 100 );2滚动窗口示例时间属性字段窗口长度 select id, TUMBLE_START(et, INTERVAL 5 SECOND) wstart, TUMBLE_END(et, INTERVAL 5 SECOND) wend, sum(vc) sumVc from ws group by id, TUMBLE(et, INTERVAL 5 SECOND);3滑动窗口时间属性字段滑动步长窗口长度 select id, HOP_START(pt, INTERVAL 3 SECOND,INTERVAL 5 SECOND) wstart, HOP_END(pt, INTERVAL 3 SECOND,INTERVAL 5 SECOND) wend,sum(vc) sumVc from ws group by id, HOP(et, INTERVAL 3 SECOND,INTERVAL 5 SECOND);4会话窗口时间属性字段会话间隔 select id, SESSION_START(et, INTERVAL 5 SECOND) wstart, SESSION_END(et, INTERVAL 5 SECOND) wend, sum(vc) sumVc from ws group by id, SESSION(et, INTERVAL 5 SECOND);窗口表值函数TVF聚合 对比GroupWindowTVF窗口更有效和强大。包括 提供更多的性能优化手段支持GroupingSets语法可以在window聚合中使用TopN提供累积窗口 对于窗口表值函数窗口本身返回的是就是一个表所以窗口会出现在FROM后面GROUP BY后面的则是窗口新增的字段window_start和window_end FROM TABLE( 窗口类型(TABLE 表名, DESCRIPTOR(时间字段),INTERVAL时间…) ) GROUP BY [window_start,][window_end,] --可选1滚动窗口 SELECT window_start, window_end, id , SUM(vc) sumVC FROM TABLE(TUMBLE(TABLE ws, DESCRIPTOR(et), INTERVAL 5 SECONDS)) GROUP BY window_start, window_end, id;2滑动窗口 要求 窗口长度滑动步长的整数倍底层会优化成多个小滚动窗口 SELECT window_start, window_end, id , SUM(vc) sumVC FROM TABLE(HOP(TABLE ws, DESCRIPTOR(et), INTERVAL 5 SECONDS , INTERVAL 10 SECONDS)) GROUP BY window_start, window_end, id;3累积窗口 累积窗口会在一定的统计周期内进行累积计算。累积窗口中有两个核心的参数最大窗口长度max window size和累积步长step。所谓的最大窗口长度其实就是我们所说的“统计周期”最终目的就是统计这段时间内的数据。 其实就是固定窗口间隔内提前触发的的滚动窗口 其实就是 Tumble Window early-fire 的一个事件时间的版本。例如从每日零点到当前这一分钟绘制累积 UV其中 10:00 时的 UV 表示从 00:00 到 10:00 的 UV 总数。 累积窗口可以认为是首先开一个最大窗口大小的滚动窗口然后根据用户设置的触发的时间间隔将这个滚动窗口拆分为多个窗口这些窗口具有相同的窗口起点和不同的窗口终点。 注意 窗口最大长度 累积步长的整数倍 SELECT window_start, window_end, id , SUM(vc) sumVC FROM TABLE(CUMULATE(TABLE ws, DESCRIPTOR(et), INTERVAL 2 SECONDS , INTERVAL 6 SECONDS)) GROUP BY window_start, window_end, id;4grouping sets多维分析 SELECT window_start, window_end, id , SUM(vc) sumVC FROM TABLE(TUMBLE(TABLE ws, DESCRIPTOR(et), INTERVAL 5 SECONDS)) GROUP BY window_start, window_end, rollup( (id) ) -- cube( (id) ) -- grouping sets( (id),() ) ;Over 聚合 OVER聚合为一系列有序行的每个输入行计算一个聚合值。与GROUP BY聚合相比OVER聚合不会将每个组的结果行数减少为一行。相反OVER聚合为每个输入行生成一个聚合值。 可以在事件时间或处理时间以及指定为时间间隔、或行计数的范围内定义Over windows。 1语法 SELECTagg_func(agg_col) OVER ([PARTITION BY col1[, col2, ...]]ORDER BY time_colrange_definition),... FROM ...ORDER BY必须是时间戳列事件时间、处理时间只能升序PARTITION BY标识了聚合窗口的聚合粒度range_definition这个标识聚合窗口的聚合数据范围在 Flink 中有两种指定数据范围的方式。第一种为按照行数聚合第二种为按照时间区间聚合 2案例 1按照时间区间聚合 统计每个传感器前10秒到现在收到的水位数据条数。 SELECT id, et, vc,count(vc) OVER (PARTITION BY idORDER BY etRANGE BETWEEN INTERVAL 10 SECOND PRECEDING AND CURRENT ROW) AS cnt FROM ws也可以用WINDOW子句来在SELECT外部单独定义一个OVER窗口,可以多次使用 SELECT id, et, vc, count(vc) OVER w AS cnt, sum(vc) OVER w AS sumVC FROM ws WINDOW w AS (PARTITION BY idORDER BY etRANGE BETWEEN INTERVAL 10 SECOND PRECEDING AND CURRENT ROW )2按照行数聚合 统计每个传感器前5条到现在数据的平均水位 SELECT id, et, vc,avg(vc) OVER (PARTITION BY idORDER BY etROWS BETWEEN 5 PRECEDING AND CURRENT ROW ) AS avgVC FROM ws也可以用WINDOW子句来在SELECT外部单独定义一个OVER窗口 SELECT id, et, vc, avg(vc) OVER w AS avgVC, count(vc) OVER w AS cnt FROM ws WINDOW w AS (PARTITION BY idORDER BY etROWS BETWEEN 5 PRECEDING AND CURRENT ROW )特殊语法 —— TOP-N 目前在Flink SQL中没有能够直接调用的TOP-N函数而是提供了稍微复杂些的变通实现方法是固定写法特殊支持的over用法。 1语法 SELECT [column_list] FROM ( SELECT [column_list], ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum FROM table_name) WHERE rownum N [AND conditions]ROW_NUMBER() 标识 TopN 排序子句PARTITION BY col1[, col2…] 标识分区字段代表按照这个 col 字段作为分区粒度对数据进行排序取 topN比如下述案例中的 partition by key 就是根据需求中的搜索关键词key做为分区ORDER BY col1 [asc|desc][, col2 [asc|desc]…] 标识 TopN 的排序规则是按照哪些字段、顺序或逆序进行排序可以不是时间字段也可以降序TopN特殊支持WHERE rownum N 这个子句是一定需要的只有加上了这个子句Flink 才能将其识别为一个TopN 的查询其中 N 代表 TopN 的条目数[AND conditions] 其他的限制条件也可以加上 2案例 取每个传感器最高的3个水位值 select id,et,vc,rownum from (select id,et,vc,row_number() over(partition by id order by vc desc ) as rownumfrom ws ) where rownum3;特殊语法 —— Deduplication去重 去重也即上文介绍到的TopN 中 row_number 1 的场景但是这里有一点不一样在于其排序字段一定是时间属性列可以降序不能是其他非时间属性的普通列。 在 row_number 1 时如果排序字段是普通列 planner 会翻译成 TopN 算子如果是时间属性列 planner 会翻译成 Deduplication这两者最终的执行算子是不一样的Deduplication 相比 TopN 算子专门做了对应的优化性能会有很大提升。可以从webui看出是翻译成哪种算子。 如果是按照时间属性字段降序表示取最新一条会造成不断的更新保存最新的一条。如果是升序表示取最早的一条不用去更新性能更好。 1语法 SELECT [column_list] FROM ( SELECT [column_list], ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr [asc|desc]) AS rownum FROM table_name) WHERE rownum 12案例 对每个传感器的水位值去重 select id,et,vc,rownum from (select id,et,vc,row_number() over(partition by id,vc order by et ) as rownumfrom ws ) where rownum1;联结Join查询 在Flink SQL中同样支持各种灵活的联结Join查询操作的对象是动态表。 在流处理中动态表的Join对应着两条数据流的Join操作。Flink SQL中的联结查询大体上也可以分为两类SQL原生的联结查询方式和流处理中特有的联结查询。 常规联结查询 常规联结Regular Join是SQL中原生定义的Join方式是最通用的一类联结操作。它的具体语法与标准SQL的联结完全相同通过关键字JOIN来联结两个表后面用关键字ON来指明联结条件。 与标准SQL一致Flink SQL的常规联结也可以分为内联结INNER JOIN和外联结OUTER JOIN区别在于结果中是否包含不符合联结条件的行。 Regular Join 包含以下几种以 L 作为左流中的数据标识 R 作为右流中的数据标识 Inner JoinInner Equal Join流任务中只有两条流 Join 到才输出输出 [L, R]Left JoinOuter Equal Join流任务中左流数据到达之后无论有没有 Join 到右流的数据都会输出Join 到输出 [L, R] 没 Join 到输出 [L, null] 如果右流之后数据到达之后发现左流之前输出过没有 Join 到的数据则会发起回撤流先输出 -[L, null] 然后输出 [L, R]Right JoinOuter Equal Join有 Left Join 一样左表和右表的执行逻辑完全相反Full JoinOuter Equal Join流任务中左流或者右流的数据到达之后无论有没有 Join 到另外一条流的数据都会输出对右流来说Join 到输出 [L, R] 没 Join 到输出 [null, R] 对左流来说Join 到输出 [L, R] 没 Join 到输出 [L, null] 。如果一条流的数据到达之后发现之前另一条流之前输出过没有 Join 到的数据则会发起回撤流左流数据到达为例回撤 -[null, R] 输出[L, R] 右流数据到达为例回撤 -[L, null] 输出 [L, R] Regular Join 的注意事项 实时 Regular Join 可以不是等值 join 。等值 join 和 非等值 join 区别在于 等值 join数据 shuffle 策略是 Hash会按照 Join on 中的等值条件作为 id 发往对应的下游 非等值 join 数据 shuffle 策略是 Global所有数据发往一个并发按照非等值条件进行关联流的上游是无限的数据所以要做到关联的话Flink 会将两条流的所有数据都存储在 State 中所以 Flink 任务的 State 会无限增大因此你需要为 State 配置合适的 TTL以防止 State 过大。 再准备一张表用于join CREATE TABLE ws1 (id INT,vc INT,pt AS PROCTIME(), --处理时间et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间WATERMARK FOR et AS et - INTERVAL 0.001 SECOND --watermark ) WITH (connector datagen,rows-per-second 1,fields.id.min 3,fields.id.max 5,fields.vc.min 1,fields.vc.max 100 );1等值内联结INNER Equi-JOIN 内联结用INNER JOIN来定义会返回两表中符合联接条件的所有行的组合也就是所谓的笛卡尔积Cartesian product。目前仅支持等值联结条件。 SELECT * FROM ws INNER JOIN ws1 ON ws.id ws1.id2等值外联结OUTER Equi-JOIN 与内联结类似外联结也会返回符合联结条件的所有行的笛卡尔积另外还可以将某一侧表中找不到任何匹配的行也单独返回。Flink SQL支持左外LEFT JOIN、右外RIGHT JOIN和全外FULL OUTER JOIN分别表示会将左侧表、右侧表以及双侧表中没有任何匹配的行返回。 具体用法如下 SELECT * FROM ws LEFT JOIN ws1 ON ws.id ws1.idSELECT * FROM ws RIGHT JOIN ws1 ON ws.id ws1.idSELECT * FROM ws FULL OUTER JOIN ws1 ON ws.id ws.id这部分知识与标准SQL中是完全一样的。 间隔联结查询 我们曾经学习过DataStream API中的双流Join包括窗口联结window join和间隔联结interval join。两条流的Join就对应着SQL中两个表的Join这是流处理中特有的联结方式。目前Flink SQL还不支持窗口联结而间隔联结则已经实现。 间隔联结Interval Join返回的同样是符合约束条件的两条中数据的笛卡尔积。只不过这里的“约束条件”除了常规的联结条件外还多了一个时间间隔的限制。具体语法有以下要点 两表的联结 间隔联结不需要用JOIN关键字直接在FROM后将要联结的两表列出来就可以用逗号分隔。这与标准SQL中的语法一致表示一个“交叉联结”Cross Join会返回两表中所有行的笛卡尔积。联结条件 联结条件用WHERE子句来定义用一个等值表达式描述。交叉联结之后再用WHERE进行条件筛选效果跟内联结INNER JOIN … ON …非常类似。时间间隔限制 我们可以在WHERE子句中联结条件后用AND追加一个时间间隔的限制条件做法是提取左右两侧表中的时间字段然后用一个表达式来指明两者需要满足的间隔限制。具体定义方式有下面三种这里分别用ltime和rtime表示左右表中的时间字段 1ltime rtime 2ltime rtime AND ltime rtime INTERVAL ‘10’ MINUTE 3ltime BETWEEN rtime - INTERVAL ‘10’ SECOND AND rtime INTERVAL ‘5’ SECOND SELECT * FROM ws,ws1 WHERE ws.id ws1. id AND ws.et BETWEEN ws1.et - INTERVAL 2 SECOND AND ws1.et INTERVAL 2 SECOND维表联结查询 Lookup Join 其实就是维表 Join实时获取外部缓存的 JoinLookup 的意思就是实时查找。 上面说的这几种 Join 都是流与流之间的 Join而 Lookup Join 是流与 RedisMysqlHBase 这种外部存储介质的 Join。仅支持处理时间字段。 表A JOIN 维度表名 FOR SYSTEM_TIME AS OF 表A.proc_time AS 别名 ON xx.字段别名.字段比如维表在mysql维表join的写法如下: CREATE TABLE Customers (id INT,name STRING,country STRING,zip STRING ) WITH (connector jdbc,url jdbc:mysql://hadoop102:3306/customerdb,table-name customers ); -- order表每来一条数据都会去mysql的customers表查找维度数据SELECT o.order_id, o.total, c.country, c.zip FROM Orders AS oJOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS cON o.customer_id c.id;Order by 和 limit 1order by 支持 Batch\Streaming但在实时任务中一般用的非常少。 实时任务中Order By 子句中必须要有时间属性字段并且必须写在最前面且为升序。 SELECT * FROM ws ORDER BY et, id desc2limit SELECT * FROM ws LIMIT 3SQL Hints 在执行查询时可以在表名后面添加SQL Hints来临时修改表属性对当前job生效。 select * from ws1/* OPTIONS(rows-per-second10)*/;集合操作 1UNION 和 UNION ALL UNION将集合合并并且去重 UNION ALL将集合合并不做去重。 (SELECT id FROM ws) UNION (SELECT id FROM ws1); (SELECT id FROM ws) UNION ALL (SELECT id FROM ws1);2Intersect 和 Intersect All Intersect交集并且去重 Intersect ALL交集不做去重 (SELECT id FROM ws) INTERSECT (SELECT id FROM ws1); (SELECT id FROM ws) INTERSECT ALL (SELECT id FROM ws1);3Except 和 Except All Except差集并且去重 Except ALL差集不做去重 (SELECT id FROM ws) EXCEPT (SELECT id FROM ws1); (SELECT id FROM ws) EXCEPT ALL (SELECT id FROM ws1);上述 SQL 在流式任务中如果一条左流数据先来了没有从右流集合数据中找到对应的数据时会直接输出当右流对应数据后续来了之后会下发回撤流将之前的数据給撤回。这也是一个回撤流 4In 子查询 In 子查询的结果集只能有一列 SELECT id, vc FROM ws WHERE id IN ( SELECT id FROM ws1 )上述 SQL 的 In 子句和之前介绍到的 Inner Join 类似。并且 In 子查询也会涉及到大状态问题要注意设置 State 的 TTL。 系统函数 查看系统函数的官方文档 https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/functions/systemfunctions/系统函数System Functions也叫内置函数Built-in Functions是在系统中预先实现好的功能模块。我们可以通过固定的函数名直接调用实现想要的转换操作。Flink SQL提供了大量的系统函数几乎支持所有的标准SQL中的操作这为我们使用SQL编写流处理程序提供了极大的方便。 Flink SQL中的系统函数又主要可以分为两大类标量函数Scalar Functions和聚合函数Aggregate Functions。 1标量函数Scalar Functions 标量函数指的就是只对输入数据做转换操作、返回一个值的函数。 标量函数是最常见、也最简单的一类系统函数数量非常庞大很多在标准SQL中也有定义。所以我们这里只对一些常见类型列举部分函数做一个简单概述具体应用可以查看官网的完整函数列表。 比较函数Comparison Functions 比较函数其实就是一个比较表达式用来判断两个值之间的关系返回一个布尔类型的值。这个比较表达式可以是用 、、 等符号连接两个值也可以是用关键字定义的某种判断。例如 1value1 value2 判断两个值相等 2value1 value2 判断两个值不相等 3value IS NOT NULL 判断value不为空逻辑函数Logical Functions 逻辑函数就是一个逻辑表达式也就是用与AND、或OR、非NOT将布尔类型的值连接起来也可以用判断语句IS、IS NOT进行真值判断返回的还是一个布尔类型的值。例如 1boolean1 OR boolean2 布尔值boolean1与布尔值boolean2取逻辑或 2boolean IS FALSE 判断布尔值boolean是否为false 3NOT boolean 布尔值boolean取逻辑非算术函数Arithmetic Functions 进行算术计算的函数包括用算术符号连接的运算和复杂的数学运算。例如 1numeric1 numeric2 两数相加 2POWER(numeric1, numeric2) 幂运算取数numeric1的numeric2次方 3RAND() 返回0.0, 1.0区间内的一个double类型的伪随机数字符串函数String Functions 进行字符串处理的函数。例如 1string1 || string2 两个字符串的连接 2UPPER(string) 将字符串string转为全部大写 3CHAR_LENGTH(string) 计算字符串string的长度时间函数Temporal Functions 进行与时间相关操作的函数。例如 1DATE string 按格式yyyy-MM-dd解析字符串string返回类型为SQL Date 2TIMESTAMP string 按格式yyyy-MM-dd HH:mm:ss[.SSS]解析返回类型为SQL timestamp 3CURRENT_TIME 返回本地时区的当前时间类型为SQL time与LOCALTIME等价 4INTERVAL string range 返回一个时间间隔。 2聚合函数Aggregate Functions 聚合函数是以表中多个行作为输入提取字段进行聚合操作的函数会将唯一的聚合值作为结果返回。聚合函数应用非常广泛不论分组聚合、窗口聚合还是开窗Over聚合对数据的聚合操作都可以用相同的函数来定义。 标准SQL中常见的聚合函数Flink SQL都是支持的目前也在不断扩展为流处理应用提供更强大的功能。例如 1COUNT(*) 返回所有行的数量统计个数。 2SUM([ ALL | DISTINCT ] expression) 对某个字段进行求和操作。默认情况下省略了关键字ALL表示对所有行求和如果指定DISTINCT则会对数据进行去重每个值只叠加一次。 3RANK() 返回当前值在一组值中的排名。 4ROW_NUMBER() 对一组值排序后返回当前值的行号。 其中RANK()和ROW_NUMBER()一般用在OVER窗口中。 Module操作 Module 允许 Flink 扩展函数能力。它是可插拔的Flink 官方本身已经提供了一些 Module用户也可以编写自己的 Module。 目前 Flink 包含了以下三种 Module CoreModuleCoreModule 是 Flink 内置的 Module其包含了目前 Flink 内置的所有 UDFFlink 默认开启的 Module 就是 CoreModule我们可以直接使用其中的 UDFHiveModuleHiveModule 可以将 Hive 内置函数作为 Flink 的系统函数提供给 SQL\Table API 用户进行使用比如 get_json_object 这类 Hive 内置函数Flink 默认的 CoreModule 是没有的用户自定义 Module用户可以实现 Module 接口实现自己的 UDF 扩展 Module 使用 LOAD 子句去加载 Flink SQL 体系内置的或者用户自定义的 ModuleUNLOAD 子句去卸载 Flink SQL 体系内置的或者用户自定义的 Module。 1语法 -- 加载 LOAD MODULE module_name [WITH (key1 val1, key2 val2, ...)] -- 卸载 UNLOAD MODULE module_name-- 查看 SHOW MODULES; SHOW FULL MODULES;在 Flink 中Module 可以被 加载、启用 、禁用 、卸载 Module当加载Module 之后默认就是开启的。同时支持多个 Module 的并且根据加载 Module 的顺序去按顺序查找和解析 UDF先查到的先解析使用。 此外Flink 只会解析已经启用了的 Module。那么当两个 Module 中出现两个同名的函数且都启用时 Flink 会根据加载 Module 的顺序进行解析结果就是会使用顺序为第一个的 Module 的 UDF可以使用下面语法更改顺序 USE MODULE hive,core;USE是启用module没有被use的为禁用禁用不是卸载除此之外还可以实现调整顺序的效果。上面的语句会将 Hive Module 设为第一个使用及解析的 Module。 2案例 加载官方已经提供的的 Hive Module将 Hive 已有的内置函数作为 Flink 的内置函数。需要先引入 hive 的 connector。其中包含了 flink 官方提供的一个 HiveModule。 1上传jar包到flink的lib中 上传hive connector cp flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar /opt/module/flink-1.17.0/lib/注意拷贝hadoop的包解决依赖冲突问题 cp /opt/module/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.4.jar /opt/module/flink-1.17.0/lib/2重启flink集群和sql-client 3加载hive module -- hive-connector内置了hive module提供了hive自带的系统函数 load module hive with (hive-version3.1.3); show modules; show functions;-- 可以调用hive的split函数 select split(a,b, ,);常用 Connector 读写 12.5.0中的DataGen和Print都是一种connector其他connector参考官网链接 Kafka 1添加kafka连接器依赖 1将flink-sql-connector-kafka-1.17.0.jar上传到flink的lib目录下 2重启yarn-session、sql-client2普通Kafka表 1创建Kafka的映射表 CREATE TABLE t1( event_time TIMESTAMP(3) METADATA FROM timestamp,--列名和元数据名一致可以省略 FROM xxxx, VIRTUAL表示只读partition BIGINT METADATA VIRTUAL,offset BIGINT METADATA VIRTUAL, id int, ts bigint , vc int ) WITH (connector kafka,properties.bootstrap.servers hadoop103:9092,properties.group.id jjm, -- earliest-offset, latest-offset, group-offsets, timestamp and specific-offsetsscan.startup.mode earliest-offset,-- fixed为flink实现的分区器一个并行度只写往kafka一个分区 sink.partitioner fixed,topic ws1,format json ) 2插入Kafka表 insert into t1(id,ts,vc) select * from source3查询Kafka表 select * from t13upsert-kafka表 如果当前表存在更新操作那么普通的kafka连接器将无法满足此时可以使用Upsert Kafka连接器。 Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。 作为 sourceupsert-kafka 连接器生产 changelog 流其中每条数据记录代表一个更新或删除事件。更准确地说数据记录中的 value 被解释为同一 key 的最后一个 value 的 UPDATE如果有这个 key如果不存在相应的 key则该更新被视为 INSERT。用表来类比changelog 流中的数据记录被解释为 UPSERT也称为 INSERT/UPDATE因为任何具有相同 key 的现有行都被覆盖。另外value 为空的消息将会被视作为 DELETE 消息。 作为 sinkupsert-kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入并将 DELETE 数据以 value 为空的 Kafka 消息写入表示对应 key 的消息被删除。Flink 将根据主键列的值对数据进行分区从而保证主键上的消息有序因此同一主键上的更新/删除消息将落在同一分区中。 1创建upsert-kafka的映射表(必须定义主键) CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED ) WITH (connector upsert-kafka,properties.bootstrap.servers hadoop102:9092,topic ws2,key.format json,value.format json )2插入upsert-kafka表 insert into t2 select id,sum(vc) sumVC from source group by id3查询upsert-kafka表 upsert-kafka 无法从指定的偏移量读取只会从主题的源读取。如此才知道整个数据的更新过程。并且通过 -UUI 等符号来显示数据的变化过程。 select * from t2Catalog Catalog 提供了元数据信息例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。 数据处理最关键的方面之一是管理元数据。元数据可以是临时的例如临时表、UDF。 元数据也可以是持久化的例如 Hive MetaStore 中的元数据。Catalog 提供了一个统一的API用于管理元数据并使其可以从 Table API 和 SQL 查询语句中来访问。 Catalog 允许用户引用其数据存储系统中现有的元数据并自动将其映射到 Flink 的相应元数据。例如Flink 可以直接使用 Hive MetaStore 中的表的元数据不必在Flink中手动重写ddl也可以将 Flink SQL 中的元数据存储到 Hive MetaStore 中。Catalog 极大地简化了用户开始使用 Flink 的步骤并极大地提升了用户体验。 Catalog类型 目前 Flink 包含了以下四种 Catalog GenericInMemoryCatalog基于内存实现的 Catalog所有元数据只在session 的生命周期即一个 Flink 任务一次运行生命周期内内可用。默认自动创建会有名为“default_catalog”的内存Catalog这个Catalog默认只有一个名为“default_database”的数据库。JdbcCatalogJdbcCatalog 使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。Postgres Catalog和MySQL Catalog是目前仅有的两种JDBC Catalog实现将元数据存储在数据库中。HiveCatalog有两个用途一是单纯作为 Flink 元数据的持久化存储二是作为读写现有 Hive 元数据的接口。注意Hive MetaStore 以小写形式存储所有元数据对象名称。Hive Metastore以小写形式存储所有元对象名称而 GenericInMemoryCatalog会区分大小写。用户自定义 Catalog用户可以实现 Catalog 接口实现自定义 Catalog。从Flink1.16开始引入了用户类加载器通过CatalogFactory.Context#getClassLoader访问否则会报错ClassNotFoundException。 JdbcCatalogMySQL 具体用法也可以参考官网 link 1上传所需jar包到lib下 1.17的JDBC连接器下载路径 link cp flink-connector-jdbc-3.1.0-1.17.jar /opt/module/flink-1.17.0/lib/ cp mysql-connector-j-8.0.31.jar /opt/module/flink-1.17.0/lib/2重启flink集群和sql-client3创建Catalog JdbcCatalog支持以下选项: name:必需Catalog名称。default-database:必需连接到的默认数据库。username: 必需Postgres/MySQL帐户的用户名。password:必需该帐号的密码。base-url:必需数据库的jdbc url(不包含数据库名) 对于Postgres Catalog是jdbc:postgresql://:端口 对于MySQL Catalog是jdbc: mysql://:端口 CREATE CATALOG my_jdbc_catalog WITH( ‘type’ ‘jdbc’, ‘default-database’ ‘test’, ‘username’ ‘root’, ‘password’ ‘000000’, ‘base-url’ ‘jdbc:mysql://hadoop102:3306’ ); 4查看Catalog SHOW CATALOGS;--查看当前的CATALOG SHOW CURRENT CATALOG;5使用指定Catalog USE CATALOG my_jdbc_catalog;--查看当前的CATALOG SHOW CURRENT CATALOG;HiveCatalog 官网参考 link 1上传所需jar包到lib下 cp flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar /opt/module/flink-1.17.0/lib cp mysql-connector-j-8.0.31.jar /opt/module/flink-1.17.0/lib/2更换planner依赖 只有在使用Hive方言或HiveServer2时才需要这样额外的计划器jar移动但这是Hive集成的推荐设置。 mv /opt/module/flink-1.17.0/opt/flink-table-planner_2.12-1.17.0.jar /opt/module/flink-1.17.0/lib/flink-table-planner_2.12-1.17.0.jarmv /opt/module/flink-1.17.0/lib/flink-table-planner-loader-1.17.0.jar /opt/module/flink-1.17.0/opt/flink-table-planner-loader-1.17.0.jar3重启flink集群和sql-client4启动外置的hive metastore服务 Hive metastore必须作为独立服务运行也就是hive-site中必须配置 hive.metastore.uris hive --service metastore 5创建Catalog 配置项必需默认值类型说明typeYes(none)StringCatalog类型创建HiveCatalog时必须设置为’hive’。nameYes(none)StringCatalog的唯一名称hive-conf-dirNo(none)String包含hive -site.xml的目录,需要Hadoop文件系统支持。如果没指定hdfs协议则认为是本地文件系统。如果不指定该选项则在类路径中搜索hive-site.xml。default-databaseNodefaultStringHive Catalog使用的默认数据库hive-versionNo(none)StringHiveCatalog能够自动检测正在使用的Hive版本。建议不要指定Hive版本除非自动检测失败。hadoop-conf-dirNo(none)StringHadoop conf目录的路径。只支持本地文件系统路径。设置Hadoop conf的推荐方法是通过HADOOP_CONF_DIR环境变量。只有当环境变量不适合你时才使用该选项例如如果你想分别配置每个HiveCatalog。 CREATE CATALOG myhive WITH (type hive,default-database default,hive-conf-dir /opt/module/hive/conf );6查看Catalog SHOW CATALOGS;--查看当前的CATALOG SHOW CURRENT CATALOG;7使用指定Catalog USE CATALOG myhive;--查看当前的CATALOG SHOW CURRENT CATALOG;建表退出sql-client重进查看catalog和表还在。 8读写Hive表 SHOW DATABASES; -- 可以看到hive的数据库USE test; -- 可以切换到hive的数据库SHOW TABLES; -- 可以看到hive的表SELECT * from ws; --可以读取hive表INSERT INTO ws VALUES(1,1,1); -- 可以写入hive表File 1创建FileSystem映射表 CREATE TABLE t3( id int, ts bigint , vc int ) WITH (connector filesystem,path hdfs://hadoop102:8020/data/t3,format csv )2写入 insert into t3 select * from source3查询 select * from t3 where id 14报错问题 如果有以下报错 如上报错是因为之前lib下放了sql-hive的连接器jar包解决方案有两种 将hive的连接器jar包挪走重启yarn-session、sql-client mv flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar.bak同HiveCatalog中的操作替换planner的jar包 JDBCMySQL Flink在将数据写入外部数据库时使用DDL中定义的主键。如果定义了主键则连接器以upsert模式操作否则连接器以追加模式操作。 在upsert模式下Flink会根据主键插入新行或更新现有行Flink这样可以保证幂等性。为了保证输出结果符合预期建议为表定义主键并确保主键是底层数据库表的唯一键集或主键之一。在追加模式下Flink将所有记录解释为INSERT消息如果底层数据库中发生了主键或唯一约束违反则INSERT操作可能会失败。 1mysql的test库中建表 CREATE TABLE ws2 (id int(11) NOT NULL,ts bigint(20) DEFAULT NULL,vc int(11) DEFAULT NULL,PRIMARY KEY (id) ) ENGINEInnoDB DEFAULT CHARSETutf82添加JDBC连接器依赖 上传jdbc连接器的jar包和mysql的连接驱动包到flink/lib下 flink-connector-jdbc-3.1.0-1.17.jarmysql-connector-j-8.0.31.jar 3创建JDBC映射表 CREATE TABLE t4 (id INT,ts BIGINT, vc INT, PRIMARY KEY (id) NOT ENFORCED ) WITH (connectorjdbc,url jdbc:mysql://hadoop102:3306/test?useUnicodetruecharacterEncodingUTF-8,username root,password 000000,connection.max-retry-timeout 60s,table-name ws2,sink.buffer-flush.max-rows 500,sink.buffer-flush.interval 5s,sink.max-retries 3,sink.parallelism 1 );4查询 select * from t45写入 insert into t4 select * from sourcesql-client 中使用 savepoint 1提交一个insert作业可以给作业设置名称 INSERT INTO sink select * from source;2查看job列表 SHOW JOBS;3停止作业触发savepoint SET state.checkpoints.dirhdfs://hadoop102:8020/chk; SET state.savepoints.dirhdfs://hadoop102:8020/sp;STOP JOB 228d70913eab60dda85c5e7f78b5782c WITH SAVEPOINT;4从savepoint恢复 -- 设置从savepoint恢复的路径 SET execution.savepoint.pathhdfs://hadoop102:8020/sp/savepoint-37f5e6-0013a2874f0a; -- 之后直接提交sql就会从savepoint恢复--允许跳过无法还原的保存点状态 set execution.savepoint.ignore-unclaimed-state true; 5恢复后重置路径 指定execution.savepoint.path后将影响后面执行的所有DML语句可以使用RESET命令重置这个配置选项。 RESET execution.savepoint.path;如果出现reset没生效的问题可能是个bug我们可以退出sql-client再重新进不需要重启flink的集群。
http://www.zqtcl.cn/news/601166/

相关文章:

  • 手机管理网站网站打开速度优化
  • 做微网站需要什么做的比较好的美食网站有哪些
  • 五金商城网站建设注意wordpress虚拟空
  • 成都工程网站建设网站界面设计的优点
  • 网站建设里的知识找别人做公司网站第一步做什么
  • 婚纱摄影网站模板之家专业seo网站优化公司
  • 商丘市住房和城乡建设局网站广西网站建设timkee
  • php网站开发是做什么的网站策划总结
  • 站长工具seo推广秒收录WordPress注册插件中文
  • 目前个人网站做地最好是哪几家做汽配网站需要多少钱
  • php做网站多少钱网络营销推广方案3篇
  • 浙江坤宇建设有限公司 网站省直部门门户网站建设
  • 直播类网站怎么做上海市建设质量协会网站
  • 筑巢做网站怎么样网站设计接单
  • 会ps的如何做网站wordpress 仿虎嗅
  • 免费响应式网站建设嘉兴建企业网站
  • 织梦网站首页幻灯片不显示建设银行网站特色
  • php企业网站开发东莞网站建设时间
  • 仿win8网站模板网站开发接私活的经理
  • 仿牌网站 域名注册衡水安徽网站建设
  • 合肥义城建设集团有限公司网站专业建站公司电话咨询
  • 国外平面设计网站有哪些建商城网站公司
  • 深圳做响应式网站网站建设公司行业现状
  • 网站部署城阳网站开发公司
  • 旅游网站的网页设计素材如何网络推广运营
  • 惠州网站建设多少钱注册邮箱
  • 视频制作网站都有哪些网站优化的公司
  • 网站开发运营推广叫什么苏州seo关键词优化推广
  • 龙泉驿区建设局网站引流推广平台软件
  • 做盗版网站韩国服装网站建设