广元市城乡规划建设监察大队网站,住房和城乡建设部课题网站,江苏省建设厅网站官网,民治营销型网站canal 数据异构组件 为啥要使用这个组件#xff1f; 在更新DB的时候不同步更新到redis#xff0c;es等数据库中#xff0c;时间太久#xff0c;而且可能会存在同步失败的问题#xff0c;因此引入canal去拉取DB的数据#xff0c;再去更新到redis#xff0c;es等数据库中 在更新DB的时候不同步更新到redises等数据库中时间太久而且可能会存在同步失败的问题因此引入canal去拉取DB的数据再去更新到redises等数据库中有失败重试和回滚等功能。 canal原理 canal 伪装成salve向mysql发送dump协议拿到备份数据binlog,去更新数据到redises等数据库中或者通过组装数据之后更新。canal可以拿到更新前的所有数据更新后的所有数据更新了哪些数据 canal 组件的使用
1.下载canal组件
下载地址canal组件下载地址 在我的资源中也有canal组件包 解压启动我是windows版双击startup.bat 2.数据库配置 1.开启MySQL , 需要先开启 Binlog 写入功能 [mysqld]
log-binmysql-bin # 开启 binlog
binlog-formatROW # 选择 ROW 模式
server_id1 # 配置 MySQL replaction 需要定义不要和 canal 的 slaveId 重复2.授权 canal 作为mysql 的slave 的权限 CREATE USER canal IDENTIFIED BY canal;
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal%;
-- GRANT ALL PRIVILEGES ON *.* TO canal% ;
FLUSH PRIVILEGES;3.项目引入jar包
dependencygroupIdcom.alibaba.otter/groupIdartifactIdcanal.client/artifactIdversion1.1.4/version
/dependency4.写canal监听数据工具类
package com.next.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress;
import java.util.List;public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接CanalConnector connector CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), example, , );int batchSize 1000;int emptyCount 0;try {connector.connect();connector.subscribe(.*\\..*);connector.rollback();int totalEmptyCount 120;while (emptyCount totalEmptyCount) {Message message connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId message.getId();int size message.getEntries().size();if (batchId -1 || size 0) {emptyCount;System.out.println(empty count : emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount 0;// System.out.printf(message[batchId%s,size%s] \n, batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println(empty too many times, exit);} finally {connector.disconnect();}}private static void printEntry(ListCanalEntry.Entry entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage null;try {rowChage CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException(ERROR ## parser of eromanga-event has an error , data: entry.toString(),e);}CanalEntry.EventType eventType rowChage.getEventType();System.out.println(String.format(gt; binlog[%s:%s] , name[%s,%s] , eventType : %s,entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType CanalEntry.EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType CanalEntry.EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println(-------gt; before);printColumn(rowData.getBeforeColumnsList());System.out.println(-------gt; after);printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(ListCanalEntry.Column columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() : column.getValue() update column.getUpdated());}}
}
5.简单例子使用测试 1.数据库更改user_id从0改为1再从1改为0 2.查看canal监测的数据canal可以拿到更新前的所有数据更新后的所有数据更新了哪些数据 6.进一步完善canal监听数据工具类用于应用例子 1.加入监听器项目启动时启动 2.使用线程去监听数据 3.替换掉system.out.print()里面有锁会阻塞使用日志打印 4.处理canal监测到的数据 package com.next.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.next.dao.TrainNumberDetailMapper;
import com.next.service.TrainNumberService;
import com.next.service.TrainSeatService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.util.List;/*** desc 不要用system.out.print()里面有锁会阻塞用日志打印*/
Service
Slf4j
public class CanalSubscribe implements ApplicationListenerContextRefreshedEvent {Resourceprivate TrainSeatService trainSeatService;Resourceprivate TrainNumberService trainNumberService;//监听启动的时候就开始调用此监听方法Overridepublic void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {canalSubscribe();}private void canalSubscribe() {// 创建链接CanalConnector connector CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), example, , );int batchSize 1000;//使用线程new Thread(() - {try {log.info(canal subscribe);connector.connect();connector.subscribe(.*\\..*);connector.rollback();while (true) {Message message connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId message.getId();int size message.getEntries().size();if (batchId -1 || size 0) {//没有取到数据继续safeSleep(100);continue;}try {log.info(new message,batchIds:{},size:{}, batchId, batchSize);//打印日志printEntry(message.getEntries());// 提交确认connector.ack(batchId);} catch (Exception e2) {log.error(canal data exception,batchIds:{}, batchId, e2);// 处理失败, 回滚数据connector.rollback(batchId);}}} catch (Exception e3) {log.error(canal subscribe exception, e3);safeSleep(1000);canalSubscribe();}}).start();}private void printEntry(ListCanalEntry.Entry entrys) throws Exception{for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage null;try {rowChage CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException(RowChange.parse Exception , data: entry, e);}//更新类型-更新删除新增CanalEntry.EventType eventType rowChage.getEventType();//数据库名String schemaName entry.getHeader().getSchemaName();//表名String tableName entry.getHeader().getTableName();log.info(name:[{},{}],eventType:{},schemaName,tableName,eventType);for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType CanalEntry.EventType.DELETE) {handleColumn(rowData.getBeforeColumnsList(), eventType, schemaName, tableName);} else {handleColumn(rowData.getAfterColumnsList(), eventType, schemaName, tableName);}}}}//处理canal监测到的数据private void handleColumn(ListCanalEntry.Column columnsList, CanalEntry.EventType eventType, String schemaName, String tableName) throws Exception{if(schemaName.contains(12306_seat_)){//处理座位变更trainSeatService.handle(columnsList,eventType);}else if(tableName.equals(train_number)){//车次详情处理(实际上是车次信息变更之后才批量处理车次详情)trainNumberService.handle(columnsList,eventType);}else{log.info(drop data,no need care);}}private void safeSleep(int millis) {try {Thread.sleep(100);} catch (Exception e1) {}}}
处理canal监测到的数据拿到改变的数据放到实体类中存到redis中
package com.next.service;import com.alibaba.otter.canal.protocol.CanalEntry;
import com.next.dao.TrainNumberMapper;
import com.next.model.TrainNumber;
import com.next.model.TrainSeat;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.List;Service
Slf4j
public class TrainSeatService {Resourceprivate TrainNumberMapper trainNumberMapper;Resourceprivate TrainCacheService trainCacheService;//处理座位canal通过监听座位库拿到改变的数据放到实体类中public void handle(ListCanalEntry.Column columns, CanalEntry.EventType eventType) {if (eventType ! CanalEntry.EventType.UPDATE) {log.info(not update,no need care);return;}TrainSeat trainSeat new TrainSeat();boolean isStatusUpdated false;for (CanalEntry.Column column : columns) {//票的状态改变了才做下面的操作if (column.getName().equals(status)) {trainSeat.setStatus(Integer.parseInt(column.getValue()));if (column.getUpdated()) {isStatusUpdated true;} else {break;}} else if (column.getName().equals(id)) {trainSeat.setId(Long.parseLong(column.getValue()));} else if (column.getName().equals(carriage_number)) {trainSeat.setCarriageNumber(Integer.parseInt(column.getValue()));} else if (column.getName().equals(row_number)) {trainSeat.setRowNumber(Integer.parseInt(column.getValue()));} else if (column.getName().equals(seat_number)) {trainSeat.setSeatNumber(Integer.parseInt(column.getValue()));} else if (column.getName().equals(train_number_id)) {trainSeat.setTrainNumberId(Integer.parseInt(column.getValue()));} else if (column.getName().equals(ticket)) {trainSeat.setTicket(column.getValue());} else if (column.getName().equals(from_station_id)) {trainSeat.setFromStationId(Integer.parseInt(column.getValue()));} else if (column.getName().equals(to_station_id)) {trainSeat.setToStationId(Integer.parseInt(column.getValue()));}}if (!isStatusUpdated) {log.info(status not update,no need care);}log.info(train seat update,trainSeat:{}, trainSeat);/*** 数据存到redis* 1.指定座位被占hash* cacheKey:车次_日期 D386_20231001* field: carriage_row_seat_fromStationId_toStationId* value: 0-空闲 1-占座** 2.每个座位详情剩余的座位数* cacheKey: 车次_日期_count D386_20231001_count* field: fromStationId_toStationId* value: 实际座位数**/TrainNumber trainNumber trainNumberMapper.selectByPrimaryKey(trainSeat.getTrainNumberId());//放票if (trainSeat.getStatus() 1) {trainCacheService.hset(trainNumber.getName() _ trainSeat.getTicket(),trainSeat.getCarriageNumber() _ trainSeat.getRowNumber() _ trainSeat.getSeatNumber() _ trainSeat.getFromStationId() _ trainSeat.getToStationId(),0);trainCacheService.hincr(trainNumber.getName() _ trainSeat.getTicket() _count,trainSeat.getFromStationId() _ trainSeat.getToStationId(),1l);log.info(seat1,trainNumber:{},trainSeat:{}, trainNumber, trainSeat);//占票} else if (trainSeat.getStatus() 2) {trainCacheService.hset(trainNumber.getName() _ trainSeat.getTicket(),trainSeat.getCarriageNumber() _ trainSeat.getRowNumber() _ trainSeat.getSeatNumber() _ trainSeat.getFromStationId() _ trainSeat.getToStationId(),1);trainCacheService.hincr(trainNumber.getName() _ trainSeat.getTicket() _count,trainSeat.getFromStationId() _ trainSeat.getToStationId(),-1l);log.info(seat-1,trainNumber:{},trainSeat:{}, trainNumber, trainSeat);} else {log.info(status update not 1 or 2,no need care);}}} 参考文档canal使用说明文档