手机网站建,流程图制作网站,谷歌云 搭建wordpress,wordpress自定义用户注册亚马逊云科技Amazon MSK是Amazon云平台提供的托管Kafka服务。在系统升级或迁移时#xff0c;用户常常需要将一个Amazon MSK集群中的数据导出#xff08;备份#xff09;#xff0c;然后在新集群或另一个集群中再将数据导入#xff08;还原#xff09;。通常#xff0c;K…亚马逊云科技Amazon MSK是Amazon云平台提供的托管Kafka服务。在系统升级或迁移时用户常常需要将一个Amazon MSK集群中的数据导出备份然后在新集群或另一个集群中再将数据导入还原。通常Kafka集群间的数据复制和同步多采用Kafka MirrorMaker但是在某些场景中受环境限制两个于Kafka集群之间的网络可能无法连通或者两个亚马逊云科技账号相互隔离亦或是需要将Kafka的数据沉淀为文件存储以备他用。此时基于Kafka Connect S3 Source/Sink Connector的方案会是一种较为合适的选择本文就将介绍一下这一方案的具体实现。 数据的导出、导入、备份、还原通常都是一次性操作为此搭建完备持久的基础设施并无太大必要省时省力简单便捷才是优先的考量因素。为此本文将提供一套开箱即用的解决方案方案使用Docker搭建Kafka Connect所有操作均配备自动化Shell脚本用户只需设置一些环境变量并执行相应脚本即可完成全部工作。这种基于Docker的单体模式可以应对中小型规模的数据同步和迁移如果要寻求稳定、健壮的解决方案可以考虑将Docker版本的Kafka Connect迁移到Kubernetes或Amazon MSK Connect实现集群化部署。 整体架构 首先介绍一下方案的整体架构。导出/导入和备份/还原其实是两种高度类似的场景但为了描述清晰我们还是分开讨论。先看一下导出/导入的架构示意图 在这个架构中Source端的MSK是数据流的起点安装了S3 Sink Connector的Kafka Connect会从Source端的MSK中提取指定Topic的数据然后以Json或Avro文件的形式存储到S3上同时另一个安装了S3 Source Connector的Kafka Connect会从S3上读取这些Json或Avro文件然后写入到Sink端MSK的对应Topic中。如果Source端和Sink端的MSK集群不在同一个Region可以在各自的Region分别完成导入和导出然后在两个Region之间使用S3的Cross-Rejion Replication进行数据同步。 该架构只需进行简单的调整即可用于MSK集群的备份/还原如下图所示先将MSK集群的数据备份到S3上待完成集群的升级、迁移或重建工作后再从S3上将数据恢复到新建集群即可。 预设条件 本文聚焦于Kafka Connect的数据导出/导入和备份/还原操作需要提前准备 一台基于Amazon Linux2的EC2实例建议新建纯净实例本文所有的实操脚本都将在该实例上执行该实例也是运行Kafka Connect Docker Container的宿主机。 两个MSK集群一个作为Source一个作为Sink如果只有一个MSK集群也可完成验证该集群将既作Source又作Sink。 为聚焦Kafka Connect S3 Source/Sink Connector的核心配置预设MSK集群没有开启身份认证即认证类型为Unauthenticated数据传输方式为PLAINTEXT以便简化Kafka Connect的连接配置。 网络连通性上要求EC2实例能访问S3、Source端MSK集群、Sink端MSK集群。如果在实际环境中无法同时连通Source端和Sink端则可以在两台分属于不同网络的EC2上进行操作但它们必须都能访问S3。如果是跨Region或账号隔离则另需配置S3 Cross-Region Replication或手动拷贝数据文件。 全局配置 由于实际操作将不可避免地依赖到具体的亚马逊云科技账号以及本地环境里的各项信息如AKSK服务地址各类路径Topic名称等为了保证本文给出的操作脚本具有良好的可移植性将所有与环境相关的信息抽离出来以全局变量的形式在实操前集中配置。以下就是全局变量的配置脚本读者需要根据个人环境设定这些变量的取值 为了便于演示和解读本文将使用下面的全局配置其中前6项配置与账号和环境强相关仍需用户自行修改脚本中给出的仅为示意值而后5项配置与MSK数据的导入导出息息相关不建议修改因为后续的解读将基于这里设定的值展开待完成验证后您可再根据需要灵活修改后5项配置以完成实际的导入导出工作。 回到操作流程登录准备好的EC2实例修改下面脚本中与账号和环境相关的前6项配置然后执行修改后的脚本。此外需要提醒注意的是在后续操作中部分脚本执行后将不再返回而是持续占用当前窗口输出日志或Kafka消息因此需要新开命令行窗口每次新开窗口都需要执行一次这里的全局配置脚本。 关于上述脚本中的后5项配置有如下详细说明 我们就以脚本中设定的值为例解读一下这5项配置联合起来将要实现的功能同时也是本文将演示的主要内容 在Source端的MSK集群上存在两个名为source-topic-1和source-topic-2的Topic通过安装有S3 Sink Connector的Kafka ConnectDocker容器将两个Topic的数据导出到S3的指定存储桶中然后再通过安装有S3 Source Connector的Kafka ConnectDocker容器可以和S3 Source Connector共存为一个Docker容器将S3存储桶中的数据写入到Sink端的MSK集群上其中原source-topic-1的数据将被写入sink-topic-1原source-topic-2的数据将被写入sink-topic-2。 特别地如果是备份/还原场景需要保持导出/导入的Topic名称一致此时可直接删除S3 Source Connector中以transforms开头的4项配置将在下文中出现或者将下面两项改为 如果只有一个MSK集群同样可以完成本文的验证工作只需将SOURCE_KAFKA_BOOTSTRAP_SEVERS和SINK_KAFKA_BOOTSTRAP_SEVERS同时设置为该集群即可这样该集群既是Source端又是Sink端由于配置中的Source Topics和Sink Topics并不同名所以不会产生冲突。 环境准备 安装工具包 在EC2上执行以下脚本安装并配置jqyqdockerjdkkafka-console-client五个必须的软件包可以根据自身EC2的情况酌情选择安装全部或部分软件。建议使用纯净的EC2实例完成全部的软件安装 创建S3存储桶 整个方案以S3作为数据转储媒介为此需要在S3上创建一个存储桶。Source端MSK集群的数据将会导出到该桶中并以Json文件形式保存向Sink端MSK集群导入数据时读取的也是存储在该桶中的Json文件。 在源MSK上创建Source Topics 为了确保Topics数据能完整备份和还原S3 Source Connector建议Sink Topics的分区数最好与Source Topics保持一致如果让MSK自动创建Topic则很有可能会导致Source Topics和Sink Topics的分区数不对等所以选择手动创建Source Topics和Sink Topics并确保它们的分区数一致。以下脚本将创建source-topic-1和source-topic-2两个Topic各含9个分区 在目标MSK上创建Sink Topics 原因同上以下脚本将创建sink-topic-1和sink-topic-2两个Topic各含9个分区 制作Kafka Connect镜像 接下来是制作带S3 Sink Connector和S3 Source Connector的Kafka Connect镜像镜像和容器均以kafka-s3-syncer命名以下是具体操作 配置并启动Kafka Connect 镜像制作完成后就可以启动了Kafka Connect了。Kafka Connect有很多配置项需要提醒注意的是在下面的配置中使用的是Kafka Connect内置的消息转换器JsonConverter如果你的输入/输出格式是Avro或Parquet则需要另行安装对应插件并设置正确的Converter Class。 上述脚本执行后命令窗口将不再返回而是会持续输出容器日志因此下一步操作需要新开一个命令行窗口。 配置并启动S3 Sink Connector 在第5节的操作中已经将S3 Sink Connector安装到了Kafka Connect的Docker镜像中但是还需要显式地配置并启动它。新开一个命令行窗口先执行一遍《实操步骤(1)全局配置》声明全局变量然后执行以下脚本 配置并启动S3 Source Connector 同上在第5节的操作中已经将S3 Source Connector安装到了Kafka Connect的Docker镜像中同样需要显式地配置并启动它 至此整个环境搭建完毕一个以S3作为中转媒介的MSK数据导出、导入、备份、还原链路已经处于运行状态。 测试 现在来验证一下整个链路是否能正常工作。首先使用kafka-console-consumer.sh监控source-topic-1和sink-topic-1两个Topic然后使用脚本向source-topic-1持续写入数据如果在sink-topic-1看到了相同的数据输出就说明数据成功地从source-topic-1导出然后又导入到了sink-topic-1中相应的在S3存储桶中也能看到“沉淀”的数据文件。 打开Source Topic 新开一个命令行窗口先执行一遍《实操步骤(1)全局配置》声明全局变量然后使用如下命令持续监控source-topic-1中的数据 打开Sink Topic 新开一个命令行窗口先执行一遍《实操步骤(1)全局配置》声明全局变量然后使用如下命令持续监控sink-topic-1中的数据 向Source Topic写入数据 新开一个命令行窗口先执行一遍《实操步骤(1全局配置》声明全局变量然后使用如下命令向source-topic-1中写入数据 现象与结论 执行上述写入操作后从监控source-topic-1的命令行窗口中可以很快看到写入的数据这说明Source端MSK已经开始持续产生数据了随后约1分钟即可在监控sink-topic-1的命令行窗口中看到相同的输出数据这说明目标端的数据同步也已开始正常工作。此时打开S3的存储桶会发现大量Json文件这些Json是由S3 Sink Connector从source-topic-1导出并存放到S3上的然后S3 Source Connector又读取了这些Json并写入到了sink-topic-1中至此整个方案的演示与验证工作全部结束。 清理 在验证过程中可能需要多次调整并重试每次重试最好恢复到初始状态以下脚本会帮助清理所有已创建的资源 小结 本方案主要定位于轻便易用在S3 Sink Connector和S3 Source Connector中还有很多与性能、吞吐量相关的配置例如s3.part.size, flush.size, s3.poll.interval.ms, tasks.max等可以在实际需要自行调整此外Kafka Connect也可以方便地迁移到Kubernetes或Amazon MSK Connect中以实现集群化部署。