网站制作公司优势,横沥建设网站,免费做网站的问题,合肥优化网站TiDB数据库从入门到精通系列之六#xff1a;使用 TiCDC 将 TiDB 的数据同步到 Apache Kafka 一、技术流程二、搭建环境三、创建Kafka changefeed四、写入数据以产生变更日志五、配置 Flink 消费 Kafka 数据 一、技术流程
快速搭建 TiCDC 集群、Kafka 集群和 Flink 集群创建 c… TiDB数据库从入门到精通系列之六使用 TiCDC 将 TiDB 的数据同步到 Apache Kafka 一、技术流程二、搭建环境三、创建Kafka changefeed四、写入数据以产生变更日志五、配置 Flink 消费 Kafka 数据 一、技术流程
快速搭建 TiCDC 集群、Kafka 集群和 Flink 集群创建 changefeed将 TiDB 增量数据输出至 Kafka使用 go-tpc 写入数据到上游 TiDB使用 Kafka console consumer 观察数据被写入到指定的 Topic可选配置 Flink 集群消费 Kafka 内数据
二、搭建环境
部署包含 TiCDC 的 TiDB 集群
在实验或测试环境中可以使用 TiUP Playground 功能快速部署 TiCDC命令如下
tiup playground --host 0.0.0.0 --db 1 --pd 1 --kv 1 --tiflash 0 --ticdc 1
# 查看集群状态
tiup status三、创建Kafka changefeed
1.创建 changefeed 配置文件
根据 Flink 的要求和规范每张表的增量数据需要发送到独立的 Topic 中并且每个事件需要按照主键值分发 Partition。因此需要创建一个名为 changefeed.conf 的配置文件填写如下内容
[sink]
dispatchers [
{matcher [*.*], topic tidb_{schema}_{table}, partitionindex-value},
]2.创建一个 changefeed将增量数据输出到 Kafka
tiup ctl:vCLUSTER_VERSION cdc changefeed
create --serverhttp://127.0.0.1:8300
--sink-urikafka://127.0.0.1:9092/kafka-topic-name?protocolcanal-json
--changefeed-idkafka-changefeed
--configchangefeed.conf如果命令执行成功将会返回被创建的 changefeed 的相关信息包含被创建的 changefeed 的 ID 以及相关信息内容如下
Create changefeed successfully!
ID: kafka-changefeed
Info: {... changfeed info json struct ...}如果命令长时间没有返回你需要检查当前执行命令所在服务器到 sink-uri 中指定的 Kafka 机器的网络可达性保证二者之间的网络连接正常。
生产环境下 Kafka 集群通常有多个 broker 节点你可以在 sink-uri 中配置多个 broker 的访问地址这有助于提升 changefeed 到 Kafka 集群访问的稳定性当部分被配置的 Kafka 节点故障的时候changefeed 依旧可以正常工作。假设 Kafka 集群中有 3 个 broker 节点地址分别为 127.0.0.1:9092 / 127.0.0.2:9092 / 127.0.0.3:9092可以参考如下 sink-uri 创建 changefeed
tiup ctl:vCLUSTER_VERSION cdc changefeed create
--serverhttp://127.0.0.1:8300
--sink-urikafka://127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092/kafka-topic-name?protocolcanal-jsonpartition-num3replication-factor1max-message-bytes1048576
--configchangefeed.conf3.Changefeed 创建成功后执行如下命令查看 changefeed 的状态
tiup ctl:vCLUSTER_VERSION cdc changefeed list --serverhttp://127.0.0.1:8300四、写入数据以产生变更日志
完成以上步骤后TiCDC 会将上游 TiDB 的增量数据变更日志发送到 Kafka下面对 TiDB 写入数据以产生增量数据变更日志。
1.模拟业务负载
在测试实验环境下可以使用 go-tpc 向上游 TiDB 集群写入数据以让 TiDB 产生事件变更数据。如下命令首先在上游 TiDB 创建名为 tpcc 的数据库然后使用 TiUP bench 写入数据到这个数据库中。
tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 prepare
tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 run --time 300s2.消费 Kafka Topic 中的数据
changefeed 正常运行时会向 Kafka Topic 写入数据你可以通过由 Kafka 提供的 kafka-console-consumer.sh观测到数据成功被写入到 Kafka Topic 中
./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic ${topic-name}至此TiDB 的增量数据变更日志就实时地复制到了 Kafka。下一步你可以使用 Flink 消费 Kafka 数据。当然你也可以自行开发适用于业务场景的 Kafka 消费端。
五、配置 Flink 消费 Kafka 数据
1.安装 Flink Kafka Connector
在 Flink 生态中Flink Kafka Connector 用于消费 Kafka 中的数据并输出到 Flink 中。Flink Kafka Connector 并不是内建的因此在 Flink 安装完毕后还需要将 Flink Kafka Connector 及其依赖项添加到 Flink 安装目录中。下载下列 jar 文件至 Flink 安装目录下的 lib 目录中如果你已经运行了 Flink 集群请重启集群以加载新的插件。
flink-connector-kafka-1.17.1.jarflink-sql-connector-kafka-1.17.1.jarkafka-clients-3.5.1.jar
2.创建一个表
可以在 Flink 的安装目录执行如下命令启动 Flink SQL 交互式客户端
[rootflink flink-1.15.0]# ./bin/sql-client.sh随后执行如下语句创建一个名为 tpcc_orders 的表
CREATE TABLE tpcc_orders (o_id INTEGER,o_d_id INTEGER,o_w_id INTEGER,o_c_id INTEGER,o_entry_d STRING,o_carrier_id INTEGER,o_ol_cnt INTEGER,o_all_local INTEGER
) WITH (
connector kafka,
topic tidb_tpcc_orders,
properties.bootstrap.servers 127.0.0.1:9092,
properties.group.id testGroup,
format canal-json,
scan.startup.mode earliest-offset,
properties.auto.offset.reset earliest
)请将 topic 和 properties.bootstrap.servers 参数替换为环境中的实际值。
3.查询表内容
执行如下命令查询 tpcc_orders 表中的数据
SELECT * FROM tpcc_orders;执行成功后可以观察到有数据输出如下图 至此就完成了 TiDB 与 Flink 的数据集成。