犀牛云网站建设特点,wordpress微信支付插件下载,wordpress移动端悬浮导航代码,大连开发区二手房最新房源第一章 CDC简介
1.1 什么是CDC
CDC (Change Data Capture 变更数据获取#xff09;的简称。核心思想就是#xff0c;检测并获取数据库的变动#xff08;增删查改#xff09;#xff0c;将这些变更按发生的顺序记录下来#xff0c;写入到消息中间件以供其它服务进行订…第一章 CDC简介
1.1 什么是CDC
CDC (Change Data Capture 变更数据获取的简称。核心思想就是检测并获取数据库的变动增删查改将这些变更按发生的顺序记录下来写入到消息中间件以供其它服务进行订阅及消费。
1.2 CDC的种类
主要分为两大类 基于查询 通过sql查询来获取变化部分的数据。如通过时间查询前一天、最近一个小时的数据。 基于binlog日志 binlog记录了历史操作通过binlog日志再执行一次和数据库一样的操作如增删查改就能实现同步数据了。
基于查询基于Binlog执行模式batchstreaming是否可以捕获所有数据变化否是延迟高延迟低延迟是否增加数据库压力是否 1.3 Flink-CDC
Flink社区开发了flink-cdc-connectors组件这是一个可以直接从mysql、PostgreSQL等数据库直接读取全量数据和增量变更数据的source组件。
项目地址https://github.com/ververica/flink-cdc-connectors 既然已经有了很多CDC的方案为什么还需要Flink-CDC 其实Flink内部使用的就是Debezium基于binlog日志的方式。只是我们获取到数据变更之后往往需要进一步分析处理而实时的分析处理会用flink来做处理。这样的话就经过了两步。Flink-CDC相当于把CDC集成到了Flink中可以获取到数据变更后直接处理一部到位。 第二章 Flink-CDC案例实操
2.1 DataStream方式的应用
2.1.1 导入依赖 dependencygroupIdcom.ververica/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion2.4.0/version
/dependencypom.xml propertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetflink.version1.17.0/flink.versionproject.build.sourceEncodingUTF-8/project.build.sourceEncoding
/propertiesdependencies!-- Flink相关依赖 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-base/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdcom.ververica/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion2.4.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-loader/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-runtime/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-files/artifactIdversion${flink.version}/version/dependency!-- 其它 --dependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.24/version/dependency
/dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion3.2.0/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationartifactSetexcludesexcludecom.google.code.findbugs:jsr305/exclude/excludes/artifactSetfiltersfilter!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --artifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filterstransformerstransformer implementationorg.apache.maven.plugins.shade.resource.ManifestResourceTransformer!-- Replace this with the main class of your job --mainClasscom.zlin.flink.cdc.FinkCdc/mainClass/transformertransformer implementationorg.apache.maven.plugins.shade.resource.ServicesResourceTransformer//transformers/configuration/execution/executions/plugin/plugins
/build2.1.2 编写代码
创建库表
create database if not exists db_cdc_test;create table if not exists db_cdc_test.tb_a(id BIGINT auto_increment primary key comment 自增id,name VARCHAR(20),age INT
);开启binlog日志
[roothadoop102 ~]# vim /etc/my.cnf
[mysqld]
server-id1
log-binmysql-bin
binlog_formatrow
binlog_do_dbdb_cdc_test编写代码(官网示例)
package com.zlin.flink.cdc;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** author ZLin* since 2023/7/16*/
public class FinkCdc {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);MySqlSourceString mySqlSource MySqlSource.Stringbuilder().hostname(hadoop102).port(3306).username(root).password(xxxxxxxx).databaseList(db_cdc_test).tableList(db_cdc_test.tb_a).deserializer(new JsonDebeziumDeserializationSchema()).build();env.enableCheckpointing(3000);env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),MSQL Source).setParallelism(4).print().setParallelism(1);env.execute(Print MySQL Snapshot Binlog);}
}
注意 读取binlog是可以选择模式的 .startupOptions() 默认是 StartupOptions.initial() 创建表时必须要有主键不然没有输出也不报错…我就被这个地方坑了很久 如果是 StartupOptions.latest() 则可以没有主键 2.1.3 案例测试
IDEA上运行
在数据库中对表增删查改然后观察控制台输出
集群上运行
step1.打包
step2.将jar上传至服务器提交任务
[roothadoop102 bin]# ./flink run -t yarn-per-job /opt/jars/flink-cdc-1.0-SNAPSHOT.jar查看输出 设置checkpoint增量读取binlog
step1.编写代码
package com.zlin.flink.cdc;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** author ZLin* since 2023/7/16*/
public class FinkCdc {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);MySqlSourceString mySqlSource MySqlSource.Stringbuilder().hostname(hadoop102).port(3306).username(root).password(root2023).databaseList(db_cdc_test).tableList(db_cdc_test.tb_a).deserializer(new JsonDebeziumDeserializationSchema()).build();// 启动检查点env.enableCheckpointing(5000);// 检查点配置CheckpointConfig checkpointConfig env.getCheckpointConfig();checkpointConfig.setCheckpointTimeout(10000);checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);checkpointConfig.setMaxConcurrentCheckpoints(1);env.setStateBackend(new HashMapStateBackend());env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),MSQL Source).setParallelism(4).print().setParallelism(1);env.execute(Print MySQL Snapshot Binlog);}
}step2.打包、提交任务
[roothadoop102 bin]# ./flink run -t yarn-per-job /opt/jars/flink-cdc-1.0-SNAPSHOT.jar
....
2023-08-02 17:31:51,211 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface hadoop103:43015 of application application_1687072825718_0040.
Job has been submitted with JobID e7e1b088cdd924a952fb3d95aa171dbc这里通过flink on yarn的方式提交的任务需要记录两个地方后面创建savepoint需要用到
yidapplication_1687072825718_0040
jobide7e1b088cdd924a952fb3d95aa171dbc
step3.创建保存点
配置保存点存储路径这里使用默认路径
[roothadoop102 conf]# vim flink-conf.yaml
[roothadoop103 conf]# vim flink-conf.yaml
[roothadoop104 conf]# vim flink-conf.yaml
state.savepoints.dir: hdfs://hadoop102:9000/flink-savepoints创建保存点
[roothadoop102 flink-1.17.1]# bin/flink savepoint e7e1b088cdd924a952fb3d95aa171dbc -yid application_1687072825718_0040step4.kill掉任务我们再进行一些增删查改的工作然后从保存点开始恢复任务
[roothadoop102 bin]# ./flink run -s hdfs://hadoop102:9000/flink-savepoints/savepoint-e7e1b0-3aa5a2eda751 -t yarn-per-job /opt/jars/flink-cdc-1.0-SNAPSHOT.jar原来读过的操作不再读取从没有读过的操作开始读取 2.2 FlinkSQL方式的应用
2.1.1 代码实现
package com.zlin.flink.cdc;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;/*** author ZLin* since 2023/8/2*/
public class FlinkSqlCdc {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 创建Flink Table环境StreamTableEnvironment tEnv StreamTableEnvironment.create(env);// 使用FlinkSQL模式创建CDC表tEnv.executeSql(CREATE TABLE tb_a (id string, \n name string,\n age string\n ) WITH (\n connectormysql-cdc, hostnamehadoop102, usernamexxxx, passwordxxxx, database-namedb_cdc_test, table-nametb_a, scan.startup.modelatest-offset, scan.incremental.snapshot.chunk.key-columnid));// 查询数据并转换为流输出Table table tEnv.sqlQuery(select * from tb_a);tEnv.toChangelogStream(table).print();env.execute(Flink SQL CDC);}
}刚开始一直报错后面发下是我mysql密码里面有个字符,改了密码去掉特殊字符之后就正常了。不知道什么原因。。。。 2.3 自定义反序列化器
2.3.1 代码实现
自定义序列化器 CustomDeserializationSchema
package com.zlin.flink.cdc.func;import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;import java.util.List;/*** author ZLin* since 2023/8/6*/
public class CustomDeserializationSchema implements DebeziumDeserializationSchemaString {Overridepublic void deserialize(SourceRecord sourceRecord, CollectorString collector) throws Exception {JSONObject result new JSONObject();String topic sourceRecord.topic();String[] fields topic.split(\\.);result.put(db, fields[1]);result.put(tableName, fields[2]);Struct value (Struct) sourceRecord.value();// before数据result.put(before, getData(before, value));// after数据result.put(after, getData(after, value));// 操作类型Envelope.Operation operation Envelope.operationFor(sourceRecord);result.put(op, operation);collector.collect(result.toString());}/*** 返回类型** return 返回类型*/Overridepublic TypeInformationString getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}private JSONObject getData(String dataType, Struct value) throws Exception {Struct data value.getStruct(dataType);JSONObject dataJson new JSONObject();if (data ! null) {Schema schema data.schema();ListField fieldsList schema.fields();if (fieldsList ! null !fieldsList.isEmpty()) {for (Field field : fieldsList) {try {dataJson.put(field.name(), data.get(field));} catch (JSONException e) {throw new Exception(String.format(字段%s读取失败, field.name()), e);}}}}return dataJson;}
}使用自定义序列化器改变输出格式方便后续处理
package com.zlin.flink.cdc;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.zlin.flink.cdc.func.CustomDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** author ZLin* since 2023/7/16*/
public class FinkCdcCustomDeserialization {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 启动检查点env.enableCheckpointing(3000);MySqlSourceString mySqlSource MySqlSource.Stringbuilder().hostname(hadoop102).port(3306).username(root).password(xxxx).databaseList(db_cdc_test).tableList(db_cdc_test.tb_a).startupOptions(StartupOptions.initial()).deserializer(new CustomDeserializationSchema()).build();env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),MSQL Source).setParallelism(4).print().setParallelism(1);env.execute(Print MySQL Snapshot Binlog);}
}
输出
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.ClosureCleaner).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
{db:db_cdc_test,tableName:tb_a,before:{},after:{id:3,name:yy,age:55},op:READ}
{db:db_cdc_test,tableName:tb_a,before:{},after:{id:2,name:pp,age:33},op:READ}
{db:db_cdc_test,tableName:tb_a,before:{},after:{id:1,name:rr,age:11},op:READ}
八月 08, 2023 9:30:58 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to hadoop102:3306 at mysql-bin.000002/17799 (sid:5925, cid:1129)2.4 DataStream和Flink SQL方式区别
DataStream方式 Flink 1.12可使用Flink SQL方式 1.13版本可用DataStream方式支持多库多表的监控而Flink SQL只能单表监控
第三章 Flink-CDC 2.0
3.1 1.x痛点 一致性通过加锁保证 Debezium在保证一致性的时候需要全局锁可能会增加数据库hang住的风险并且容易对在线业务造成影响且DBA一般不给锁权限。 不支持水平扩展 单并发全量读取时数据量大的时候耗时很长 全量读取时不支持checkpoint CDC读取分为两个阶段全量读取和增量读取。1.x在全量读取阶段失败后需要重新全部读取。
3.2 设计目标 2.0的设计目标就是为了解决1.x的痛点 无锁水平扩展支持checkpoint
3.2.1 引入【Debezium 锁分析】 Flink CDC 底层封装了Debezium那我们先来了解下Debezium的锁机制 Debezium 同步一张表分为两个阶段
全量阶段查询当前表中所有记录增量阶段从binlog中消费变更数据
大部分用户使用的场景都是全量 增量同步加锁是发生在全量阶段目的是为了确定全量阶段的初始位点保证增量 全量实现一条不多一条不少从而保证数据一致性。从下图中我们可以分析全局锁和表锁的一些加锁流程左边红色线条是锁的生命周期右边是MySQL 开启可重复读事务的生命周期。 以全局锁为例 首先是获取一个锁 然后再去开启可重复读的事务。这里加锁范围是读取binlog 的当前位点和当前表的schema。这样做的目的是保证binlog 的起始位置和读取到的当前schema 是可以对应上的因为表的schema 是会改变的比如删除列或者增加列。在读取这两个信息后SnapshotReader 会在可重复读事务里读取全量数据在全量数据读取完成后会启动BinlogReader 从读取的binlog 起始位置开始增量读取从而保证全量数据 增量数据的无缝衔接。
表锁是全局锁的退化版因为全局锁的权限会比较高因此在某些场景用户可能没有全局锁的权限但是有表锁的权限。不过表锁的加锁时间会更长因为表锁有个特征锁提前释放了可重复读的事务默认会提交所以锁需要等到全量数据读完后才能释放。
不管是全局锁还是表级锁这些锁到底会造成怎样严重的后果: Flink CDC 1.x 默认使用全局锁保证了数据一致性但存在上述hang 住数据的风险。因此Flink CDC 1.x 也提供显式配置关闭加锁操作但可能会导致数据准确性问题。
3.3 设计实现 借鉴了Netflix的DBlog的无锁设计思想 3.3.1 整体概览
对有主键的表 初始化模式整体的流程主要分为以下5个阶段
step1.Chunk切分
step2.Chunk分配
step3.Chunk读取
step4.Chunk汇报
step5.Chunk分配 3.3.2 Chunk切分 对于一张有主键的表我们可以根据表的主键对表中的数据进行分片。假设100条记录的表key为[1,100]我们按照步长10来进行切分按左闭右开或者左开右闭那么切分成了null,10),[10,20),…,[90,100),[100,null) 相当于把表切分成了 chunk-0null,10) … chunk-10:[100,null) 这就是chunk切分的过程通过切分我们得到多个chunk然后我们再并发的对每个chunk分别进行全量读取和增量读取这样就解决了水平扩展的问题。也就是说当数据量很大时我们全量读取的时候不再是单并发可以做到多并发大大提高读取效率。 切分算法描述 3.3.3 Chunk读取
因为每个chunk 只负责自己主键范围内的数据不难推导只要能够保证每个Chunk 读取的一致性就能保证整张表读取的一致性这便是无锁算法的基本原理。
如何实现单个chunk的读取在没有锁的情况下保证一致性呢 整个思想是借鉴了Netflix 的DBLog 论文中Chunk 读取的无锁算法 Netflix 的DBLog 论文中Chunk读取算法是通过在数据库中维护一张信号表再通过信号表在binlog文件中打点记录每个chunk 读取前的Low Position (低位点) 和读取结束之后High Position (高位点) 在低位点和高位点之间去查询该Chunk的全量数据。在读取出这一部分Chunk 的数据之后再将这2个位点之间的binlog增量数据合并到chunk所属的全量数据从而得到高位点时刻该chunk 对应的全量数据。 解释假如现在有一张表a,里面就两条记录(zhangsan, 男, 20), (李四, 男, 30) 在读取之前我们记录下当前位点Low Position假设位置为1假设在读完第一条记录的时候我们做了一些其它操作增删查改删了性别这一列然后我们读取完第二条记录李四30。我们得到的全量数据为(zhangsan, 男, 20),李四30读取完之后记录位点High Position假设是2。此时我们需要拿读取到的数据和 binlog中Low Position到High Position的操作去做一个合并这里的合并不是拿我们读取的全量数据去执行从Low Position到High Position的所有操作。而是以操作最后的结果为准得到在High Position点的全量数据。 Flink CDC 结合自身的情况在Chunk 读取算法上做了去信号表的改进不需要侵入业务去额外维护信号表直接通过读取binlog 位点替代在binlog 中做标记的功能整体的chunk 读算法描述如下图所示 比如正在读取Chunk-1Chunk 的区间是[K1, K10]首先直接将该区间内的数据select 出来并把它存在buffer 中在select 之前记录binlog 的当前位点(低位点)select 完成后再次记录binlog 的当前位点(高位点)。然后开始消费从低位点到高位点的binlog并合并到buffer 中。 其实就是把地位点、高位点存在信号表换成了存在buffer中 图中的-(k2,100) 和(k2,108) 记录表示这条数据的值从100 更新到108第二条记录是删除k3第三条记录是更新k2 为119第四条记录是k5 的数据由原来的77 变更为100。
观察图片中右下角最终的输出会发现在消费该chunk 的binlog 时出现的key 是k2、k3、k5我们前往buffer 将这些key 做标记。 对于k1、k4、k6、k7 来说在高位点读取完毕之后这些记录没有变化过所以这些数据是可以直接输出的 对于改变过的数据则需要将增量的数据合并到全量的数据中只保留合并后的最终数据。例如k2 最终的结果是119 那么只需要输出(k2,119)而不需要中间发生过改变的数据。
通过这种方式Chunk 最终的输出就是该chunk 区间在高位点对应的一致性快照数据。
3.3.4 Chunk分配 上面我们讲述了单个Chunk 的一致性读但是如果有多个表分了很多不同的Chunk且这些Chunk 分发到了不同的task 中那么如何分发Chunk 并保证全局一致性读呢 这个就是基于FLIP-27 来优雅地实现的通过下图可以看到有SourceEnumerator 的组件这个组件主要用于Chunk 的划分划分好的Chunk 会提供给下游的SourceReader 去读取通过把chunk 分发给不同的SourceReader 便实现了并发读取Snapshot Chunk 的过程同时基于FLIP-27 我们能较为方便地做到chunk 粒度的checkpoint。 3.3.5 Chunk汇报 当Snapshot Chunk 读取完成之后需要有一个汇报的流程如下图中橘色的汇报信息将Snapshot Chunk 完成信息汇报给SourceEnumerator。 汇报的主要目的是为了后续分发binlog chunk (就是告知全量阶段已经完成可以开始后续的增量阶段)。因为Flink CDC 支持全量 增量同步所以当所有Snapshot Chunk 读取完成之后还需要消费增量的binlog。
3.3.6 Chunk分配 所以当所有Snapshot Chunk 读取完成之后还需要消费增量的binlog这是通过下发一个binlog chunk 给任意一个Source Reader 进行单并发读取实现的。 3.3.7 总结
整体流程 通过主键对表进行Snapshot Chunk 划分-将Snapshot Chunk 分 发给多个SourceReader每个Snapshot Chunk 读取时通过算法实现无锁条件下的一致性读 SourceReader 读取时支持chunk 粒度的checkpoint-所有Snapshot Chunk 读取完成后 下发一个binlog chunk 进行增量部分的binlog 读取
提供MySQL CDC 2.0核心feature 包括:
○ 并发读取全量数据的读取性能可以水平扩展
○ 全程无锁不对线上业务产生锁的风险
○ 断点续传支持全量阶段的checkpoint。