北京比较好的网站公司,信宜手机网站建设公司,wordpress维护页面,江门网站制作维护Redis学习——高级篇④ Redis7高级之Redis与Mysql数据双写一致性工程案例#xff08;四#xff09; 4.1 MySQL主从复制原理4.2 canal 工作原理4.3 mySQL-canal-redis 双写一致性1.环境2.配置Mysql3.配置canal4. Canal客户端#xff08;Java编写#xff0… Redis学习——高级篇④ Redis7高级之Redis与Mysql数据双写一致性工程案例四 4.1 MySQL主从复制原理4.2 canal 工作原理4.3 mySQL-canal-redis 双写一致性1.环境2.配置Mysql3.配置canal4. Canal客户端Java编写1.SQL脚本(随便找个数据库)2.建Module3.改POM4.改YML5.启动类6.业务类 Redis7高级之Redis与Mysql数据双写一致性工程案例四
4.1 MySQL主从复制原理 MySQL的主从复制
当Master主服务器上的数据发生改变时将其改变写入二进制事件日志文件中Slave 从服务器会在一定时间间隔内对master 主服务器上的二进制日志进行探测探测其是否发生过改变 如果探测到 master 主服务器的二进制时间发生了改变则开始一个 I/O Thread 请求 master 二进制事件日志 同时 master 主服务器为每个 I/O Thread 启动一个 dump Thread用于向其发送二进制事件日志slave 从服务器将接收到的二进制事件日志保存至自己本地的中继日志文件中slave 从服务器将启动 SQL Thread 从中继日志中读取二进制日志在本地重放使得其数据和主服务器保持一致最后 I/O Thread 和 SQL Thread 将进入睡眠状态等待下一次被唤醒
4.2 canal 工作原理 canal 模拟 Mysql slave 的交互协议将自己作为 Mysql slave 向 Mysql master 发送 dump 协议MySQL master 收到 dump 请求开始推送 binary log 给 slave即cancalcanal 解析 binary log 对象原始为 byte 流
4.3 mySQL-canal-redis 双写一致性
1.环境
linux的mysql、canal、redis其中canal和redis在一台机器mysql单独一个机器
2.配置Mysql 别忘了 开启start net mysql80 查看mysql版本 当前的主机二进制日志——show master status; 查看SHOW VARIABLES LIKE log_bin; 如果是on跳过下一步 开启MySQL的binlog写入功能 做好备份
my.cnf
[client]
default_character_setutf8[mysqld]
log-binmysql-bin #开启 binlog
binlog-formatROW #选择 ROW 模式
server_id1 #配置MySQL replaction需要定义不要和canal的 slaveId重复
collation_server utf8_general_ci
character_set_server utf8修改 重启mysql 再次查看SHOW VARIABLES LIKE log _bin; 授权canal连接MySQL账号 mysql默认的用户在mysq|库的user表里 默认没有canal账户此处新建授权
DROP USER IF EXISTS canal%;
CREATE USER canal% IDENTIFIED BY canal;
GRANT ALL PRIVILEGES ON *.* TO canal% IDENTIFIED BY canal;
FLUSH PRIVILEGES;SELECT * FROM mysql.user;上面是教学给的 但是我报错 我这样写的 才过的 看大家自己测试的时候也希望大家要到原因留言告诉俺一声 3.配置canal
canal服务端 下载canal.deployer-1.1.7.tar.gz Releases · alibaba/canal (github.com) 解压 tar -zxvf canal.deployer-1.1.7.tar.gz 配置 修改 /mycanal/conf/example 路径下的 instance.properties 文件 启动 在 /mycanal/bin 路径下执行 ./startup.sh 必须要有 java 8 的环境
等我去装一下 java的环境 看是否成功启动报错记录如果canal启动后没有这两个文件多半是内存不够了记着扩一下内存
你可以通过以下步骤来查看 Canal 使用的 Java 是 32 位还是 64 位 查看Canal启动脚本 打开 Canal 启动脚本通常是 .sh 或 .bat 文件寻找包含 Java 启动命令的行。在这一行中你应该能够找到 Java 的路径。 如果是 Linux 系统启动脚本可能以 .sh 结尾。打开该文件找到启动 Canal 的地方。 如果是 Windows 系统启动脚本可能以 .bat 结尾。同样打开该文件找到启动 Canal 的地方。 在启动脚本中找到 Java 启动命令 查找包含 Java 启动命令的那一行这可能类似于 java -server -Xms512m -Xmx1024m -XX:MaxPermSize256m -Djava.awt.headlesstrue ...或 java -server -Xms512m -Xmx1024m -XX:MaxPermSize256m -Djava.awt.headlesstrue ...这里的 java 后面就是 Java 的执行路径。 查看 Java 的执行路径 执行路径应该类似于 /path/to/java/bin/java 或 C:\path\to\java\bin\java.exe。在这个路径中你可以看到 Java 的安装目录。 查看 Java 的位数 进入 Java 安装目录找到 java 可执行文件在 Linux 上可能是 java在 Windows 上可能是 java.exe。然后在命令行中运行以下命令 在 Linux 上 file /path/to/java/bin/java在 Windows 上 file C:\path\to\java\bin\java.exe这将显示 Java 可执行文件的详细信息包括 32 位还是 64 位。
通过以上步骤你应该能够确定 Canal 使用的 Java 是 32 位还是 64 位。如果你发现它使用了不正确的 Java 版本你可能需要更改启动脚本或配置以使用正确版本的 Java。
如果下图是这样也就是java版本太高了 “Unrecognized VM option ‘AggressiveOpts’” 错误通常发生在Java虚拟机JVM遇到不认识的选项时。在这种情况下似乎选项 “AggressiveOpts” 在你使用的JVM中不被支持。
“AggressiveOpts” 是较旧版本Java中的一个实验性选项但在最近的Java版本中已被移除。如果你使用的是较新版本的Java这个选项可能不再受支持。
解决这个问题的步骤如下 删除 ‘AggressiveOpts’ 选项 打开用于启动Canal的脚本或配置文件并找到Java命令行。从命令行中删除 “-XX:AggressiveOpts” 选项。 删除 ‘UseBiasedLocking’ 选项 打开用于启动Canal的脚本或配置文件并找到Java命令行。从命令行中删除 “-XX:UseBiasedLocking” 选项。 检查Java版本 确保你使用的是一个受支持的Java版本。你可以通过在终端或命令提示符中运行以下命令来检查你的Java版本 java -version如果你使用的是太旧或太新的Java版本请考虑更新或降级到与Canal兼容的版本。 检查JVM选项 检查并更新Canal启动脚本或配置中的其他JVM选项以确保它们与你使用的Java版本兼容。
如果问题仍然存在或者如果你遇到其他问题请提供有关你的环境、所使用的Java版本以及任何相关的Canal配置文件或启动脚本的更多详细信息以便我能够提供更具体的帮助。 查看server日志 tmd 终于好了 查看样例example的日志 4. Canal客户端Java编写 Redis用RedisTemplate
1.SQL脚本(随便找个数据库)
CREATE TABLE t_user (id BIGINT(20) NOT NULL AUTO_INCREMENT,userName VARCHAR(100) NOT NULL,PRIMARY KEY (id)) ENGINEINNODB AUTO_INCREMENT10 DEFAULT CHARSETutf8mb42.建Module
canal_demo
3.改POM
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.5.14/versionrelativePath/ !-- lookup parent from repository --/parentgroupIdcom.xfcy/groupIdartifactIdcanal_demo/artifactIdversion0.0.1-SNAPSHOT/versionnamecanal_demo/namedescriptioncanal_demo/descriptionpropertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingmaven.compiler.source1.8/maven.compiler.sourcemaven.compiler.target1.8/maven.compiler.targetjunit.version4.12/junit.versionlog4j.version1.2.17/log4j.versionlombok.version1.16.18/lombok.versionmysql.version5.1.47/mysql.versiondruid.version1.1.16/druid.versionmapper.version4.1.5/mapper.versionmybatis.spring.boot.version1.3.0/mybatis.spring.boot.version/propertiesdependencies!--canal--dependencygroupIdcom.alibaba.otter/groupIdartifactIdcanal.client/artifactIdversion1.1.0/version/dependency!--SpringBoot通用依赖模块--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-actuator/artifactId/dependency!--swagger2--dependencygroupIdio.springfox/groupIdartifactIdspringfox-swagger2/artifactIdversion2.9.2/version/dependencydependencygroupIdio.springfox/groupIdartifactIdspringfox-swagger-ui/artifactIdversion2.9.2/version/dependency!--SpringBoot与Redis整合依赖--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactId/dependencydependencygroupIdorg.apache.commons/groupIdartifactIdcommons-pool2/artifactId/dependency!--SpringBoot与AOP--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-aop/artifactId/dependencydependencygroupIdorg.aspectj/groupIdartifactIdaspectjweaver/artifactId/dependency!--Mysql数据库驱动--dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.47/version/dependency!--SpringBoot集成druid连接池--dependencygroupIdcom.alibaba/groupIdartifactIddruid-spring-boot-starter/artifactIdversion1.1.10/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIddruid/artifactIdversion${druid.version}/version/dependency!--mybatis和springboot整合--dependencygroupIdorg.mybatis.spring.boot/groupIdartifactIdmybatis-spring-boot-starter/artifactIdversion${mybatis.spring.boot.version}/version/dependency!--通用基础配置junit/devtools/test/log4j/lombok/hutool--!--hutool--dependencygroupIdcn.hutool/groupIdartifactIdhutool-all/artifactIdversion5.2.3/version/dependencydependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion${junit.version}/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdlog4j/groupIdartifactIdlog4j/artifactIdversion${log4j.version}/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion${lombok.version}/versionoptionaltrue/optional/dependency!--persistence--dependencygroupIdjavax.persistence/groupIdartifactIdpersistence-api/artifactIdversion1.0.2/version/dependency!--通用Mapper--dependencygroupIdtk.mybatis/groupIdartifactIdmapper/artifactIdversion${mapper.version}/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-autoconfigure/artifactId/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build/project4.改YML
server.port5555# alibaba.druid
spring.datasource.typecom.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-namecom.mysql.jdbc.Driver
spring.datasource.urljdbc:mysql://192.168.238.130:3306/bigdata?useUnicodetruecharacterEncodingutf-8useSSLfalse
spring.datasource.usernameroot
spring.datasource.password123456
spring.datasource.druid.test-while-idlefalse5.启动类
这个是一个模板不用启动
package com.xfcy.canal_demo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;SpringBootApplication
public class CanalDemoApplication {public static void main(String[] args) {//SpringApplication.run(CanalDemoApplication.class, args);}}6.业务类
RedisUtils类
package com.xfcy.canal_demo.util;import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;public class RedisUtils {public static final String REDIS_IP_ADDR 192.168.238.110;public static final String REDIS_pwd 111111;public static JedisPool jedisPool;static {JedisPoolConfig jedisPoolConfignew JedisPoolConfig();jedisPoolConfig.setMaxTotal(20);jedisPoolConfig.setMaxIdle(10);jedisPoolnew JedisPool(jedisPoolConfig,REDIS_IP_ADDR,6379,10000,REDIS_pwd);}public static Jedis getJedis() throws Exception {if(null!jedisPool){return jedisPool.getResource();}throw new Exception(Jedispool is not ok);}
}RedisCanalClientExample
package com.xfcy.canal_demo.biz;import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;import com.xfcy.canal_demo.util.RedisUtils;
import redis.clients.jedis.Jedis;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;public class RedisCanalClientExample {public static final Integer _60SECONDS 60;public static final String REDIS_IP_ADDR 192.168.238.130;private static void redisInsert(ListColumn columns){JSONObject jsonObject new JSONObject();for (Column column : columns){System.out.println(column.getName() : column.getValue() update column.getUpdated());jsonObject.put(column.getName(),column.getValue());}if(columns.size() 0){try(Jedis jedis RedisUtils.getJedis()){jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());}catch (Exception e){e.printStackTrace();}}}private static void redisDelete(ListColumn columns){JSONObject jsonObject new JSONObject();for (Column column : columns){jsonObject.put(column.getName(),column.getValue());}if(columns.size() 0){try(Jedis jedis RedisUtils.getJedis()){jedis.del(columns.get(0).getValue());}catch (Exception e){e.printStackTrace();}}}private static void redisUpdate(ListColumn columns){JSONObject jsonObject new JSONObject();for (Column column : columns){System.out.println(column.getName() : column.getValue() update column.getUpdated());jsonObject.put(column.getName(),column.getValue());}if(columns.size() 0){try(Jedis jedis RedisUtils.getJedis()){jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());System.out.println(---------update after: jedis.get(columns.get(0).getValue()));}catch (Exception e){e.printStackTrace();}}}public static void printEntry(ListEntry entrys){for (Entry entry : entrys) {if (entry.getEntryType() EntryType.TRANSACTIONBEGIN || entry.getEntryType() EntryType.TRANSACTIONEND) {continue;}RowChange rowChage null;try {//获取变更的row数据rowChage RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException(ERROR ## parser of eromanga-event has an error,data: entry.toString(),e);}//获取变动类型EventType eventType rowChage.getEventType();System.out.println(String.format(gt; binlog[%s:%s] , name[%s,%s] , eventType : %s,entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType EventType.INSERT) {redisInsert(rowData.getAfterColumnsList());} else if (eventType EventType.DELETE) {redisDelete(rowData.getBeforeColumnsList());} else {//EventType.UPDATEredisUpdate(rowData.getAfterColumnsList());}}}}public static void main(String[] args){System.out.println(---------O(∩_∩)O哈哈~ initCanal() main方法-----------);//// 创建链接canal服务端CanalConnector connector CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR,11111), example, , ); // 这里用户名和密码如果在这写了会覆盖canal配置文件的账号密码如果不填从配置文件中读int batchSize 1000;//空闲空转计数器int emptyCount 0;System.out.println(---------------------canal init OK开始监听mysql变化------);try {connector.connect();//connector.subscribe(.*\\..*);connector.subscribe(db01.t_user); // 设置监听哪个表connector.rollback();int totalEmptyCount 10 * _60SECONDS;while (emptyCount totalEmptyCount) {System.out.println(我是canal每秒一次正在监听: UUID.randomUUID().toString());Message message connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId message.getId();int size message.getEntries().size();if (batchId -1 || size 0) {emptyCount;try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }} else {//计数器重新置零emptyCount 0;printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println(已经监听了totalEmptyCount秒无任何消息请重启重试......);} finally {connector.disconnect();}}
}java程序下connectors.subscribe 配置的过滤正则
AB全库全表connector.subscribe(. *\\..*)指定库全表connector.subscribe(test1..*)单表connector.subscribe(test.user)多规则组合使用connector.subscribe(test\\..*,test2.user1,test3.user2)
关闭资源代码简写
try-with-resource释放资源语法糖
jdk1.7后增加了try-with-resources,他是一个声明一个或多个资源的ty语句。一个资源作为一个对象必须在程序结束之后关闭。try-with-resources语句确保在语句的最后每个资源都被关闭任何实现了java.lang AutoCloseable和java.io.Closeable的对象都可以使用try-with-resource来实现异常处理Q和关闭资源。