当前位置: 首页 > news >正文

angular2是做网站的还是手机的网站正在建设中 免费

angular2是做网站的还是手机的,网站正在建设中 免费,深圳的网站建设公司三把火,网站建设自助建站云建站文章目录 Canal整合SpringBoot详解#xff08;一#xff09;什么是canal搭建Kafka3.2.1集群⭐Kafka集群机器规划创建3台虚拟机#xff08;centos7系统#xff09;必要的环境准备#xff08;3台虚拟机都要执行如下操作#xff09;⭐分别修改每个服务器的hosts文件#xf… 文章目录 Canal整合SpringBoot详解一什么是canal搭建Kafka3.2.1集群⭐Kafka集群机器规划创建3台虚拟机centos7系统必要的环境准备3台虚拟机都要执行如下操作⭐分别修改每个服务器的hosts文件将上面的ip和主机名配置上去分别关闭每个服务器的防火墙分别为每个服务器安装jdk8分别为每个服务器安装Docker为每个节点的Docker接入阿里云镜像加速器为每个节点的docker设置开机自动启动 分别为每个服务器安装zookeeper3.7.1搭建zookeeper集群⭐分别为每个服务器安装Kafka3.2.1搭建Kafka集群⭐安装MySQL5.7配置canalmysql本次采用Docker的方式⭐ 案例1CanalKafka实现mysql和redis的数据同步⭐必要的环境配置canal.deployer启动canal.deployer⭐配置hosts由于我是Windows运行没有配置hosts导致无法识别kafka01主机名⭐创建一个SpringBoot项目⭐项目结构准备需要同步的数据库表⭐pom.xml⭐application.yml⭐RedisTemplateConfig配置类Config.class⭐canal要求必须要的实体类用于接收canal发送到kafka的同步消息⭐ConfigCanalBean.class⭐MysqlType.class⭐SqlType.class⭐ ConfigCanalRedisConsumerkafka消费者类监听指定topic把canal发送的消息同步到Redis中⭐ 创建kafka的topic我们指定的topic名称为canal-test-topic开始测试canal同步效果⭐测试1给t_config表插入数据测试2修改t_config表数据测试3删除t_config表数据 Canal整合SpringBoot详解一 什么是canal canal主要用途是基于 MySQL 数据库增量日志解析提供增量数据订阅和消费canal工作原理 canal的工作原理就是把自己伪装成MySQL slave从节点模拟MySQL slave的交互协议向MySQL Mater发送 dump协议MySQL mater收到canal发送过来的dump请求开始推送binary log给canal然后canal解析binary log再发送到存储目的地比如RocketMQ、Kafka、ElasticSearch等等。 canal能做什么 数据库镜像数据库实时备份索引构建和实时维护业务cache(缓存)刷新带业务逻辑的增量数据处理 搭建Kafka3.2.1集群⭐ Kafka集群机器规划 IP地址主机名需要安装的资源操作系统192.168.184.201kafka01jdk、Docker、zookeeper、Kafkacentos7.9192.168.184.202kafka02jdk、Docker、zookeeper、Kafkacentos7.9192.168.184.203kafka03jdk、Docker、zookeeper、Kafkacentos7.9 创建3台虚拟机centos7系统 必要的环境准备3台虚拟机都要执行如下操作⭐ 分别修改每个服务器的hosts文件将上面的ip和主机名配置上去 1进入hosts文件 vi /etc/hosts在最后面追加内容如下这个需要根据你自己服务器的ip来配置 192.168.184.201 kafka01 192.168.184.202 kafka02 192.168.184.203 kafka03分别关闭每个服务器的防火墙 systemctl stop firewalld systemctl disable firewalld分别为每个服务器安装jdk8 1进入oracle官网下载jdk8的tar.gz包 2将下载好的包上传到每个服务器上 3查看是否上传成功 [rootkafka01 ~]# ls anaconda-ks.cfg jdk-8u333-linux-x64.tar.gz4创建文件夹 mkdir -p /usr/java/5解压刚刚下载好的包并输出到/usr/java目录下 tar -zxvf jdk-8u333-linux-x64.tar.gz -C /usr/java/[rootkafka02 ~]# ls /usr/java/ jdk1.8.0_3336配置java环境变量 vi /etc/profile在文件中末尾添加如下配置需要更改的是JAVA_HOME根据自己的java目录名来更改 JAVA_HOME/usr/java/jdk1.8.0_333 CLASSPATH$JAVA_HOME/lib/ PATH$PATH:$JAVA_HOME/bin export PATH JAVA_HOME CLASSPATH7让配置立即生效 source /etc/profile8查看JDK是否安装成功 [rootkafka01 ~]# java -version java version 1.8.0_333 Java(TM) SE Runtime Environment (build 1.8.0_333-b02) Java HotSpot(TM) 64-Bit Server VM (build 25.333-b02, mixed mode)分别为每个服务器安装Docker 1切换镜像源 wget https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo -O /etc/yum.repos.d/docker-ce.repo2查看当前镜像源中支持的docker版本 yum list docker-ce --showduplicates | sort -r3安装特定版本的docker-ce yum -y install docker-ce-3:20.10.8-3.el7.x86_64 docker-ce-cli-3:20.10.8-3.el7.x86_64 containerd.io为每个节点的Docker接入阿里云镜像加速器 配置镜像加速器方法。 准备工作1首先进入阿里云容器镜像服务 https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors2点击镜像工具下面的镜像加速器3拿到你的加速器地址和下面第二步的registry-mirrors的值替换即可。 针对Docker客户端版本大于 1.10.0 的用户可以通过修改daemon配置文件/etc/docker/daemon.json来使用加速器 第一步 mkdir -p /etc/docker第二步 cat EOF /etc/docker/daemon.json {exec-opts: [native.cgroupdriversystemd], registry-mirrors: [https://u01jo9qv.mirror.aliyuncs.com,https://hub-mirror.c.163.com,https://mirror.baidubce.com],live-restore: true,log-driver:json-file,log-opts: {max-size:500m, max-file:3},max-concurrent-downloads: 10,max-concurrent-uploads: 5,storage-driver: overlay2 } EOF第三步 sudo systemctl daemon-reload第四步 sudo systemctl restart docker最后就接入阿里云容器镜像加速器成功啦。 为每个节点的docker设置开机自动启动 sudo systemctl enable docker分别为每个服务器安装zookeeper3.7.1搭建zookeeper集群⭐ 1在zookeeper官网上面下载zookeeper稳定版当前为3.7.1的tar.gz包并上传到每个服务器上 zookeeper官网 2查看刚刚上传的zookeeper包 [rootkafka01 ~]# pwd /root [rootkafka01 ~]# ls | grep zookeeper apache-zookeeper-3.7.1-bin.tar.gz3解压我们的zookeeper包 tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz -C /usr/localmv /usr/local/apache-zookeeper-3.7.1-bin/ /usr/local/zookeepercd /usr/local/zookeeper4配置关于zookeeper的环境变量 vi /etc/profile在文件中末尾添加如下配置ZOOKEEPER_HOME需要根据你自己的zookeeper目录来配置 export ZOOKEEPER_HOME/usr/local/zookeeper export PATH$ZOOKEEPER_HOME/bin:$PATH5让配置立即生效 source /etc/profile6创建目录 cd /usr/local/zookeeper sudo mkdir data7添加配置 cd conf sudo vi zoo.cfg内容如下dataDir修改成自己的目录kafka01/02/03是我们在hosts配置的主机名映射相当于ip tickTime2000 initLimit10 syncLimit5 dataDir/usr/local/zookeeper/data clientPort2181 server.1kafka01:2888:3888 server.2kafka02:2888:3888 server.3kafka03:2888:3888initLimitZooKeeper集群模式下包含多个zk进程其中一个进程为leader余下的进程为follower。 当follower最初与leader建立连接时它们之间会传输相当多的数据尤其是follower的数据落后leader很多。initLimit配置follower与leader之间建立连接后进行同步的最长时间。 syncLimit配置follower和leader之间发送消息请求和应答的最大时间长度。 tickTimetickTime则是上述两个超时配置的基本单位例如对于initLimit其配置值为5说明其超时时间为 2000ms * 5 10秒。 server.idhost:port1:port2 其中id为一个数字表示zk进程的id这个id也是dataDir目录下myid文件的内容。host是该zk进程所在的IP地址port1表示follower和leader交换消息所使用的端口port2表示选举leader所使用的端口。 dataDir其配置的含义跟单机模式下的含义类似不同的是集群模式下还有一个myid文件。myid文件的内容只有一行且内容只能为1 - 255之间的数字这个数字亦即上面介绍server.id中的id表示zk进程的id。 8进入data目录 cd /usr/local/zookeeper/data/9对每个服务器kafka01、kafka02、kafka03配置myid文件91如果是kafka01服务器则执行下面这个下面的1、2、3就是我们上面指定的server.id每个zookeeper服务器都要有一个id并且全局唯一 echo 1 myid92如果是kafka02服务器则执行下面这个 echo 2 myid93如果是kafka03服务器则执行下面这个 echo 3 myid10启动zookeeper服务命令必须要把全部zookeeper服务器启动之后在执行下一步status命令 cd /usr/local/zookeeper/bin/[rootkafka01 bin]# zkServer.sh start ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg Starting zookeeper ... STARTED11对全部的zookeeper服务器执行查看zookeeper集群节点状态命令看看哪个是leader节点、哪个是follower节点。Mode就是某一台zookeeper的角色⭐ [rootkafka01 bin]# zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: follower[rootkafka02 data]# zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: leader[rootkafka03 data]# zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: follower分别为每个服务器安装Kafka3.2.1搭建Kafka集群⭐ 1进入kafka官网 Kafka官网 2下载方式1下载当前kafka的Binary稳定版截止到2022-08-29稳定版本为3.2.1下载会十分缓慢大约要1个小时的时间假如你的网速很慢那么这种方式就不推荐了。 2下载方式2使用我上传kafka_2.13-3.2.1.zip包注意这个不是tgz包而是zip包推荐这种方式下载速度很快 kafka3.2.1快速下载地址 3解压kafka_2.13-3.2.1.zip包拿到kafka的tgz包 4将解压好的kafka的tgz包上传到每个服务器上。5查看每个服务器上是否都已经成功上传了kafka_2.13-3.2.1.tgz包 [rootkafka01 ~]# pwd /root [rootkafka01 ~]# ls | grep kafka kafka_2.13-3.2.1.tgz6解压kafka.tgz包到/usr/local下 tar -zxvf kafka_2.13-3.2.1.tgz -C /usr/local/7修改kafka目录 cd /usr/local/mv kafka_2.13-3.2.1/ kafka**8修改每个服务器的kafka配置文件注意对应的机器要执行对应的命令不是都在一台服务器执行**⭐ 81在kafka01服务器上修改的配置文件将下面的内容粘贴上去⭐ [rootkafka01 local]# rm -f /usr/local/kafka/config/server.properties [rootkafka01 local]# vi /usr/local/kafka/config/server.properties内容如下 注意下面3个地方 ①每一个kafka的broker.id都不可以一样并且要为数字比如0、1、2都是可以的 ②log.dirs为你当前机器的kafka的日志数据存储目录 ③zookeeper.connect配置连接Zookeeper集群地址下面的kafka01:2181kafka01的意思是zk所在的服务器的ip地址因为我们配置了hosts所以就直接用主机名更方便2181就是zk配置文件中的clientPort #broker 的全局唯一编号不能重复只能是数字。 broker.id1 #处理网络请求的线程数量 num.network.threads3 #用来处理磁盘 IO 的线程数量 num.io.threads8 #发送套接字的缓冲区大小 socket.send.buffer.bytes102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes102400 #请求套接字的缓冲区大小 socket.request.max.bytes104857600 #kafka 运行日志(数据)存放的路径路径不需要提前创建kafka 自动帮你创建可以配置多个磁盘路径路径与路径之间可以用分隔 log.dirs/usr/local/kafka/datas #topic 在当前 broker 上的分区个数 num.partitions1 #用来恢复和清理 data 下数据的线程数量 num.recovery.threads.per.data.dir1 # 每个 topic 创建时的副本数默认时 1 个副本 offsets.topic.replication.factor1 #segment 文件保留的最长时间超时将被删除 log.retention.hours168 #每个 segment 文件的大小默认最大 1G log.segment.bytes1073741824 # 检查过期数据的时间默认 5 分钟检查一次是否数据过期 log.retention.check.interval.ms300000 #配置连接Zookeeper集群地址在zk根目录下创建/kafka方便我们管理 zookeeper.connectkafka01:2181,kafka02:2181,kafka03:2181/kafka82在kafka02服务器上修改的配置文件将下面的内容粘贴上去⭐ [rootkafka02 local]# rm -f /usr/local/kafka/config/server.properties [rootkafka02 local]# vi /usr/local/kafka/config/server.properties内容如下 #broker 的全局唯一编号不能重复只能是数字。 broker.id2 #处理网络请求的线程数量 num.network.threads3 #用来处理磁盘 IO 的线程数量 num.io.threads8 #发送套接字的缓冲区大小 socket.send.buffer.bytes102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes102400 #请求套接字的缓冲区大小 socket.request.max.bytes104857600 #kafka 运行日志(数据)存放的路径路径不需要提前创建kafka 自动帮你创建可以配置多个磁盘路径路径与路径之间可以用分隔 log.dirs/usr/local/kafka/datas #topic 在当前 broker 上的分区个数 num.partitions1 #用来恢复和清理 data 下数据的线程数量 num.recovery.threads.per.data.dir1 # 每个 topic 创建时的副本数默认时 1 个副本 offsets.topic.replication.factor1 #segment 文件保留的最长时间超时将被删除 log.retention.hours168 #每个 segment 文件的大小默认最大 1G log.segment.bytes1073741824 # 检查过期数据的时间默认 5 分钟检查一次是否数据过期 log.retention.check.interval.ms300000 #配置连接Zookeeper集群地址在zk根目录下创建/kafka方便我们管理 zookeeper.connectkafka01:2181,kafka02:2181,kafka03:2181/kafka83在kafka03服务器上修改的配置文件将下面的内容粘贴上去⭐ [rootkafka03 local]# rm -f /usr/local/kafka/config/server.properties [rootkafka03 local]# vi /usr/local/kafka/config/server.properties内容如下 #broker 的全局唯一编号不能重复只能是数字。 broker.id3 #处理网络请求的线程数量 num.network.threads3 #用来处理磁盘 IO 的线程数量 num.io.threads8 #发送套接字的缓冲区大小 socket.send.buffer.bytes102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes102400 #请求套接字的缓冲区大小 socket.request.max.bytes104857600 #kafka 运行日志(数据)存放的路径路径不需要提前创建kafka 自动帮你创建可以配置多个磁盘路径路径与路径之间可以用分隔 log.dirs/usr/local/kafka/datas #topic 在当前 broker 上的分区个数 num.partitions1 #用来恢复和清理 data 下数据的线程数量 num.recovery.threads.per.data.dir1 # 每个 topic 创建时的副本数默认时 1 个副本 offsets.topic.replication.factor1 #segment 文件保留的最长时间超时将被删除 log.retention.hours168 #每个 segment 文件的大小默认最大 1G log.segment.bytes1073741824 # 检查过期数据的时间默认 5 分钟检查一次是否数据过期 log.retention.check.interval.ms300000 #配置连接Zookeeper集群地址在zk根目录下创建/kafka方便我们管理 zookeeper.connectkafka01:2181,kafka02:2181,kafka03:2181/kafka9给每个服务器都配置kafka的环境变量 sudo vim /etc/profile在最后面追加的内容如下 export KAFKA_HOME/usr/local/kafka export PATH$PATH:$KAFKA_HOME/bin10让配置立即生效 source /etc/profile11启动zk集群。依次在 kafka01、kafka02、kafka03节点上启动zookeeper。zk要先启动然后再启动kafka⭐ /usr/local/zookeeper/bin/zkServer.sh start12后台模式启动kafka集群。依次在 kafka01、kafka02、kafka03节点上启动kafka。 kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties13查看kafka是否启动成功 [rootkafka01 local]# jps 3603 Kafka 3166 QuorumPeerMain 4367 Jps14关闭kafka集群可以暂时不关闭方便后面继续演示 注意停止 Kafka 集群时一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper 集群。 kafka-server-stop.sh15等kafka集群全部关闭之后再关闭zookeeper可以暂时不关闭方便后面继续演示 zkServer.sh stop安装MySQL5.7配置canalmysql本次采用Docker的方式⭐ 1创建my.cnf文件也就是mysql的配置文件 vim /my-sql/mysql-master/conf/my.cnf将内容粘贴进my.cnf文件 [client] # 指定编码格式为utf8,默认的MySQL会有中文乱码问题 default_character_setutf8 [mysqld] collation_serverutf8_general_ci character_set_serverutf8# 全局唯一id不允许有相同的 server_id201 binlog-ignore-dbmysql # 指定MySQL二进制日志 log-binmysql-bin # 二进制日志格式因为要整合canal所以这里必须要是row binlog_formatrow #指定具体要同步的数据库如果不配置则表示所有数据库均开启 Binlog可以配置多个 binlog-do-dbcanal-test-db1 binlog-do-dbcanal-test-db22运行一个mysql容器实例。作为Master节点 docker run -p 3307:3306 \ -v /my-sql/mysql-master/log:/var/log/mysql \ -v /my-sql/mysql-master/data:/var/lib/mysql \ -v /my-sql/mysql-master/conf:/etc/mysql \ -e MYSQL_ROOT_PASSWORD123456 \ --name mysql-master \ -d mysql:5.73进入容器内部并登陆mysql docker exec -it mysql-master /bin/bash mysql -uroot -p4创建canal的mysql帐号使该canal帐号具有MySQL的Slave 从节点的权限也就是能够主从复制, 如果已有账户可直接 grant这几步都是在mysql容器内部进行也就是登录了mysql帐号后执行的命令 CREATE USER canal% IDENTIFIED BY canal; grant all privileges on *.* to canal% identified by canal; flush privileges;5退出mysql容器并重启容器 docker restart mysql-master6再次进入容器内部并登陆mysql docker exec -it mysql-master /bin/bash mysql -uroot -p7查看是否成功开启binlog日志 show variables like %log_bin%;案例1CanalKafka实现mysql和redis的数据同步⭐ 案例目的 1实现canal只监控canal-test-db1数据库下的t_config主要同步这个表和.t_user表。 2当我们修改canal-test-db1数据库下的t_config表的内容会自动同步到Redis中 3当我们修改canal-test-db1数据库下的t_user表则不会同步。虽然t_user表也被canal监控但是这个案例就要做到在被监控的情况下而不被同步说白了就是只同步t_config表。 必要的环境 1jdk82zookeeper3kafka4canal.deployer5Redis6Lombok 配置canal.deployer 1进入Canal的github仓库 Canal的github仓库地址 2选择canal.deployer的版本我们选择的是最新版v1.1.6 21方式1直接从GitHub上面下载。下载速度十分慢不推荐 22方式2从我的csdn上面下载。速度很快推荐⭐ canal.deployer快速下载地址 3上传到我们的服务器上这里我们就拿kafka01服务器作为canal服务器生产环境可以另外创建一个新的canal服务器。 4查看canal是否上传到我们的服务器上只上传到kafka01服务器上 [rootkafka01 ~]# ls | grep canal canal.deployer-1.1.6.tar.gz5解压canal.deployer mkdir -p /usr/local/canal-deployertar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal-deployercd /usr/local/canal-deployer/conf6修改canal.properties文件 vim /usr/local/canal-deployer/conf/canal.properties修改地方1配置zookeeper集群地址 修改地方2修改成kafka模式 修改地方3修改canal数据库用户的账号密码 修改地方4在conf目录下要有example同名的目录可以默认不改意思就是instance.properties在/usr/local/canal-deployer/conf/example目录下。 例如要将example改成abc1则也要在conf目录下创建一个abc1的目录并在里面创建instance.properties配置文件。 修改地方5配置kafka集群地址 7配置instance.properties配置文件默认是在example目录下 cd /usr/local/canal-deployer/conf/examplevim instance.properties修改地方1。canal数据库的id必须要全局唯一和mysql的id不能设置一样 修改地方2。我们MySQL的master数据库的ip端口我们上面设置mysql的是3307端口 修改地方3。在MySQL的master数据库中canal的账号密码 修改地方4。新增一个配置设置默认同步的数据库名⭐ canal.instance.defaultDatabaseName 监控的数据库名 例如canal.instance.defaultDatabaseNamecanal-test-db1 修改地方5匹配表名的正则表达式指定canal要监控的数据库.表名很重要⭐ canal.instance.filter.regexcanal-test-db1.t_config,canal-test-db1.t_user 修改地方6指定用于canal传输消息的kafka的topic名称我们指定的topic名称为canal-test-topic 启动canal.deployer⭐ 1跳转目录 cd /usr/local/canal-deployer/bin/2执行sh ./startup.sh配置hosts由于我是Windows运行没有配置hosts导致无法识别kafka01主机名⭐ 修改C:\Windows\System32\drivers\etc路径下的hosts文件 创建一个SpringBoot项目⭐ 项目结构 准备需要同步的数据库表⭐ CREATE DATABASE canal-test-db1;USE canal-test-db1;CREATE TABLE t_config (config_id bigint(20) NOT NULL,config_info text,datetime datetime DEFAULT NULL,desc varchar(255) DEFAULT NULL,PRIMARY KEY (config_id) USING BTREE ) ENGINEInnoDB DEFAULT CHARSETutf8 ROW_FORMATDYNAMIC;pom.xml⭐ ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdorg.example/groupIdartifactIdcanal-demo/artifactIdversion1.0-SNAPSHOT/versionproperties!-- springboot版本--spring-boot.version2.5.9/spring-boot.versionmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.target/propertiesdependencyManagementdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-dependencies/artifactIdversion${spring-boot.version}/versiontypepom/typescopeimport/scope/dependency/dependencies/dependencyManagementdependencies!-- spring整合kafka依赖--dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependency!-- Redis服务启动器 --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactId/dependency!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --dependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.70/version/dependency!-- springboot-web依赖--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency!-- lombok--dependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.12/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependency/dependencies/projectapplication.yml⭐ server:port: 8081 # spring整合kafka配置 spring:kafka:# kafka集群地址(可以多个)bootstrap-servers:- 192.168.184.201:9092- 192.168.184.202:9092- 192.168.184.203:9092#kafka消费者配置consumer:# 指定一个消费者组idgroup-id: canal-group1# key/value的反序列化key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer#手动提交第1步开启手动提交offsettrue的话就是消费完一条消息自动会提交enable-auto-commit: false# kafka生产者配置producer:# key/value的序列化key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerlistener:#手动提交第2步ack设置为手动enable-auto-commit要设置为false# manual_immediate每处理完业务手动调用Acknowledgment.acknowledge()后立即提交ack-mode: manual_immediate#redis配置redis:host: 127.0.0.1# password:port: 6379database: 2RedisTemplateConfig配置类 package com.boot.config;import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer;Configuration public class RedisTemplateConfig {Beanpublic RedisTemplateString, Object redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplateString, Object redisTemplate new RedisTemplate();redisTemplate.setConnectionFactory(redisConnectionFactory);StringRedisSerializer stringRedisSerializer new StringRedisSerializer();Jackson2JsonRedisSerializer jsonRedisSerializer new Jackson2JsonRedisSerializer(Object.class);redisTemplate.setKeySerializer(stringRedisSerializer);redisTemplate.setHashKeySerializer(stringRedisSerializer);redisTemplate.setValueSerializer(jsonRedisSerializer);redisTemplate.setHashValueSerializer(jsonRedisSerializer);// 解决查询缓存转换异常的问题ObjectMapper om new ObjectMapper();// 指定要序列化的域field,get和set,以及修饰符范围ANY是都有包括private和publicom.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);// 指定序列化输入的类型类必须是非final修饰的final修饰的类比如String,Integer等会跑出异常om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jsonRedisSerializer.setObjectMapper(om); //如果不设置存储到redis的对象取出来将无法进行转换redisTemplate.setDefaultSerializer(jsonRedisSerializer);return redisTemplate;} }Config.class⭐ package com.boot.entity;import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.Accessors; import java.io.Serializable;/*** 该实体类对应着数据库表t_config字段* author youzhengjie 2022-09-01 16:55:40*///lombok注解简化开发 Data AllArgsConstructor NoArgsConstructor Accessors(chain true) //开启链式编程 public class Config implements Serializable {private Long configId;private String configInfo;private String datetime;private String desc; }canal要求必须要的实体类用于接收canal发送到kafka的同步消息⭐ ConfigCanalBean.class⭐ package com.boot.entity.config_canal;import com.boot.entity.Config; import lombok.Data;import java.util.List;/*** 这个类是接收canal发送过来的消息所必须要的* author youzhengjie 2022-09-01 16:55:18*/ Data public class ConfigCanalBean {//config实体类的数据private ListConfig data;//数据库名称private String database;private long es;//递增private int id;//是否是DDL语句private boolean isDdl;//表结构的字段类型private MysqlType mysqlType;//UPDATE语句旧数据private String old;//主键名称private ListString pkNames;//sql语句private String sql;//暂时没发现什么用不过也要写上这个属性private SqlType sqlType;//表名private String table;private long ts;//(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等private String type;//getter、setter方法 }MysqlType.class⭐ package com.boot.entity.config_canal;import lombok.Data;/*** 和SqlType类差不多。就是把我们要同步的t_config数据库表的字段全部复制到这里然后全部改成String类型即可* 下面的属性名支持驼峰法。比如下面的configId属性可以和t_config表的config_id一一对应而无需更改。* 注意这个类的属性全部都要是String类型* author youzhengjie 2022-09-01 16:55:25*/ Data public class MysqlType {private String configId;private String configInfo;private String datetime;private String desc; }SqlType.class⭐ package com.boot.entity.config_canal;import lombok.Data;/*** 和MysqlType类差不多。就是把我们要同步的t_config数据库表的字段全部复制到这里然后全部改成int类型即可* 下面的属性名支持驼峰法。比如下面的configId属性可以和t_config表的config_id一一对应而无需更改。* 注意这个类的属性全部都要是int类型* author youzhengjie 2022-09-01 16:55:32*/ Data public class SqlType {private int configId;private int configInfo;private int datetime;private int desc;}ConfigCanalRedisConsumerkafka消费者类监听指定topic把canal发送的消息同步到Redis中⭐ package com.boot.comsumer;import com.alibaba.fastjson.JSONObject; import com.boot.entity.Config; import com.boot.entity.config_canal.ConfigCanalBean; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component;import java.util.List; import java.util.concurrent.TimeUnit;/*** kafka消费者监听名为canal-test-topic的topic同步Redis* author youzhengjie 2022-09-01 16:54:28*/ Component Slf4j public class ConfigCanalRedisConsumer {Autowiredprivate RedisTemplate redisTemplate;//redis的key格式数据库.表名_字段的idprivate static final String KEY_PREFIX canal-test-db1.t_config_;//过期时间单位小时private static final int TIME_OUT 24;/*** param consumer 接收消费记录消息* param ack 手动提交消息*/KafkaListener(topics canal-test-topic)public void receive(ConsumerRecordString, String consumer, Acknowledgment ack) {try {//获取canal的消息String value (String) consumer.value();log.info(topic名称:{},key:{},分区位置:{},下标:{},value:{}, consumer.topic(), consumer.key(), consumer.partition(), consumer.offset(), value);//转换为javaBeanConfigCanalBean canalBean JSONObject.parseObject(value, ConfigCanalBean.class);/*由于我们canal.instance配置了监控canal-test-db1.t_config表和canal-test-db1.t_user表生产环境下可以启动多个canal每一个canal监听一张需要同步的表所以我们要对这两张表分开处理。可以通过他们的表名canalBean.getTable()来区分如果canalBean.getTable()获取的表名是t_config则同步到redis如果不是则不管。*///System.out.println(canalBean);if(t_config.equals(canalBean.getTable())){//获取是否是DDL语句boolean isDdl canalBean.isDdl();//获取当前sql语句的类型比如INSERT、DELETE等等String type canalBean.getType();ListConfig configList canalBean.getData();//如果不是DDL语句if (!isDdl) {//INSERT和UPDATE都是一样的操作if (INSERT.equals(type) || UPDATE.equals(type)) {//新增语句for (Config config : configList) {Long id config.getConfigId();//新增到redis中,过期时间是10分钟redisTemplate.opsForValue().set(KEY_PREFIX id, JSONObject.toJSONString(config), TIME_OUT, TimeUnit.HOURS);}}else if(DELETE.equals(type)){//删除语句for (Config config : configList) {Long id config.getConfigId();//从redis中删除redisTemplate.delete(KEY_PREFIXid);}}}}//最后如果上面的代码没有报错的情况下可以确认消息了。很重要ack.acknowledge();}catch (Exception e){throw new RuntimeException();}}}创建kafka的topic我们指定的topic名称为canal-test-topic /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server kafka01:9092 --topic canal-test-topic --create开始测试canal同步效果⭐ 测试1给t_config表插入数据 测试2修改t_config表数据 测试3删除t_config表数据
http://www.zqtcl.cn/news/772708/

相关文章:

  • 一站式装修的利弊上海建设厅焊工证查询网站
  • 济宁做网站公司找融合深圳招聘一般在哪个网站
  • 重庆建网站推广公司个人网站需要建站群吗
  • 深圳网站建设吗个人博客网站制作代码
  • 化妆品网站模板网络营销的网站分类有哪些
  • 广州网站建设程序员培训wordpress 微信 抓取
  • 毕设给学校做网站个人店铺logo
  • 中国做w7的网站宿迁网站建设价位
  • 网站建设售后服务合同百度关键词排名点击器
  • 编辑网站用什么软件推广是什么
  • 北京模板开发建站做网站赚钱的点在哪里
  • 网站建设价格兴田德润i网址多少wordpress主题汉化是什么意思
  • 用最少的钱做网站根据域名查询网站名称
  • 网站开发答辩难点网站返回按钮设计
  • 鹤壁做网站优化建设银行理财产品网站
  • 电子商务类网站模板自学网站建设基本流程
  • 无锡网站制作的公司上海企业服务公司
  • 做h5小程序的网站搜索引擎营销案例
  • 订餐网站开发方案查询网站是否正规
  • 建站论坛图片生成器免费
  • 怎么做自己的店铺网站博物馆门户网站建设优势
  • 专业旅游培训网站建设应用之星 wordpress
  • 青海媒体网站建设公司深圳网站建设推广优化公司
  • 网站开发 价格跨境支付互联互通
  • 织梦 修改网站logo营销型网站设计的内容
  • 电商网站运营策划做网站CentOS还是win好
  • 小型企业网站模板企业网站seo点击软件
  • 提供邯郸企业建网站网站图片上怎么做弹幕效果
  • 滨州做网站的wordpress如何添加商桥
  • 网站登录密码忘记网站开发营业执照申请