域名不作网站用途,莱州环球网站建设,做数据网站,昆山网站建设机构博主历时三年精心创作的《大数据平台架构与原型实现#xff1a;数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行#xff0c;点击《重磅推荐#xff1a;建大数据平台太难了#xff01;给我发个工程原型吧#xff01;》了解图书详情#xff0c;…博主历时三年精心创作的《大数据平台架构与原型实现数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行点击《重磅推荐建大数据平台太难了给我发个工程原型吧》了解图书详情京东购书链接https://item.jd.com/12677623.html扫描左侧二维码进入京东手机购书页面。
使用 Flink 压测 MySQL 是一个不错的注意或者有时候我们需要在 MySQL 中生成一些可控的测试数据这时使用 Flink 的 Faker Connector 就是会很简单。本文记录一下操作方法。
1. 创建 MySQL 测试表
测试表明 MySQL 作为 Sink 时是不能通过 Flink SQL 在 MySQL 上建库或表的只能是 Flink 读取 MySQL 上现成的表 所以需要先在数据库将表建出来。
CREATE DATABASE IF NOT EXISTS example;
CREATE TABLE IF NOT EXISTS example.currency_rates (currency_code CHAR(3) NOT NULL,eur_rate DECIMAL(6,4) NOT NULL,rate_time DATETIME NOT NULL,PRIMARY KEY (currency_code)
);2. 创建 Flink Faker 表并持续写入 MySQL
create catalog mysql_datasource with (typejdbc,base-urljdbc:mysql://10.0.13.30:3307,default-databaseexample,usernameroot,passwordAdmin1234!
);drop table if exists currency_rates;
create table if not exists currency_rates with (connector faker,fields.currency_code.expression #{Currency.code},fields.eur_rate.expression #{Number.randomdouble 4,0,10},fields.rate_time.expression #{Date.past 15,SECONDS},rows-per-second 100
) like mysql_datasource.example.currency_rates (excluding options);insert into mysql_datasource.example.currency_rates select * from currency_rates;
select * from mysql_datasource.example.currency_rates;由于上述表的主键 currency_code 的值是有限的166个Flink 的 JDBC Connector 的处理方式是如果写入的数据主键已存在则改为更新所以上述 SQL 不会报错刷新 MySQL 表会发现表中的数据在不停地更新。这张表特别适合作为维表测试 Temporal Join
以下单独在 Flink SQL 中创建 currency_rates 的 SQL
create table if not exists currency_rates (currency_code string,eur_rate decimal(6,4),rate_time timestamp(3),primary key (currency_code) not enforced
) with (connector jdbc,url jdbc:mysql://10.0.13.30:3307/example,table-name currency_rates,usernameroot,passwordAdmin1234!
);