当前位置: 首页 > news >正文

广元市城乡规划建设监察大队网站住房和城乡建设部课题网站

广元市城乡规划建设监察大队网站,住房和城乡建设部课题网站,江苏省建设厅网站官网,民治营销型网站canal 数据异构组件 为啥要使用这个组件#xff1f; 在更新DB的时候不同步更新到redis#xff0c;es等数据库中#xff0c;时间太久#xff0c;而且可能会存在同步失败的问题#xff0c;因此引入canal去拉取DB的数据#xff0c;再去更新到redis#xff0c;es等数据库中 在更新DB的时候不同步更新到redises等数据库中时间太久而且可能会存在同步失败的问题因此引入canal去拉取DB的数据再去更新到redises等数据库中有失败重试和回滚等功能。 canal原理 canal 伪装成salve向mysql发送dump协议拿到备份数据binlog,去更新数据到redises等数据库中或者通过组装数据之后更新。canal可以拿到更新前的所有数据更新后的所有数据更新了哪些数据 canal 组件的使用 1.下载canal组件 下载地址canal组件下载地址 在我的资源中也有canal组件包 解压启动我是windows版双击startup.bat 2.数据库配置 1.开启MySQL , 需要先开启 Binlog 写入功能 [mysqld] log-binmysql-bin # 开启 binlog binlog-formatROW # 选择 ROW 模式 server_id1 # 配置 MySQL replaction 需要定义不要和 canal 的 slaveId 重复2.授权 canal 作为mysql 的slave 的权限 CREATE USER canal IDENTIFIED BY canal; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal%; -- GRANT ALL PRIVILEGES ON *.* TO canal% ; FLUSH PRIVILEGES;3.项目引入jar包 dependencygroupIdcom.alibaba.otter/groupIdartifactIdcanal.client/artifactIdversion1.1.4/version /dependency4.写canal监听数据工具类 package com.next.canal;import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress; import java.util.List;public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接CanalConnector connector CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), example, , );int batchSize 1000;int emptyCount 0;try {connector.connect();connector.subscribe(.*\\..*);connector.rollback();int totalEmptyCount 120;while (emptyCount totalEmptyCount) {Message message connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId message.getId();int size message.getEntries().size();if (batchId -1 || size 0) {emptyCount;System.out.println(empty count : emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount 0;// System.out.printf(message[batchId%s,size%s] \n, batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println(empty too many times, exit);} finally {connector.disconnect();}}private static void printEntry(ListCanalEntry.Entry entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage null;try {rowChage CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException(ERROR ## parser of eromanga-event has an error , data: entry.toString(),e);}CanalEntry.EventType eventType rowChage.getEventType();System.out.println(String.format(gt; binlog[%s:%s] , name[%s,%s] , eventType : %s,entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType CanalEntry.EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType CanalEntry.EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println(-------gt; before);printColumn(rowData.getBeforeColumnsList());System.out.println(-------gt; after);printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(ListCanalEntry.Column columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() : column.getValue() update column.getUpdated());}} } 5.简单例子使用测试 1.数据库更改user_id从0改为1再从1改为0 2.查看canal监测的数据canal可以拿到更新前的所有数据更新后的所有数据更新了哪些数据 6.进一步完善canal监听数据工具类用于应用例子 1.加入监听器项目启动时启动 2.使用线程去监听数据 3.替换掉system.out.print()里面有锁会阻塞使用日志打印 4.处理canal监测到的数据 package com.next.canal;import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.next.dao.TrainNumberDetailMapper; import com.next.service.TrainNumberService; import com.next.service.TrainSeatService; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.stereotype.Service;import javax.annotation.Resource; import java.net.InetSocketAddress; import java.util.List;/*** desc 不要用system.out.print()里面有锁会阻塞用日志打印*/ Service Slf4j public class CanalSubscribe implements ApplicationListenerContextRefreshedEvent {Resourceprivate TrainSeatService trainSeatService;Resourceprivate TrainNumberService trainNumberService;//监听启动的时候就开始调用此监听方法Overridepublic void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {canalSubscribe();}private void canalSubscribe() {// 创建链接CanalConnector connector CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), example, , );int batchSize 1000;//使用线程new Thread(() - {try {log.info(canal subscribe);connector.connect();connector.subscribe(.*\\..*);connector.rollback();while (true) {Message message connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId message.getId();int size message.getEntries().size();if (batchId -1 || size 0) {//没有取到数据继续safeSleep(100);continue;}try {log.info(new message,batchIds:{},size:{}, batchId, batchSize);//打印日志printEntry(message.getEntries());// 提交确认connector.ack(batchId);} catch (Exception e2) {log.error(canal data exception,batchIds:{}, batchId, e2);// 处理失败, 回滚数据connector.rollback(batchId);}}} catch (Exception e3) {log.error(canal subscribe exception, e3);safeSleep(1000);canalSubscribe();}}).start();}private void printEntry(ListCanalEntry.Entry entrys) throws Exception{for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage null;try {rowChage CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException(RowChange.parse Exception , data: entry, e);}//更新类型-更新删除新增CanalEntry.EventType eventType rowChage.getEventType();//数据库名String schemaName entry.getHeader().getSchemaName();//表名String tableName entry.getHeader().getTableName();log.info(name:[{},{}],eventType:{},schemaName,tableName,eventType);for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType CanalEntry.EventType.DELETE) {handleColumn(rowData.getBeforeColumnsList(), eventType, schemaName, tableName);} else {handleColumn(rowData.getAfterColumnsList(), eventType, schemaName, tableName);}}}}//处理canal监测到的数据private void handleColumn(ListCanalEntry.Column columnsList, CanalEntry.EventType eventType, String schemaName, String tableName) throws Exception{if(schemaName.contains(12306_seat_)){//处理座位变更trainSeatService.handle(columnsList,eventType);}else if(tableName.equals(train_number)){//车次详情处理(实际上是车次信息变更之后才批量处理车次详情)trainNumberService.handle(columnsList,eventType);}else{log.info(drop data,no need care);}}private void safeSleep(int millis) {try {Thread.sleep(100);} catch (Exception e1) {}}} 处理canal监测到的数据拿到改变的数据放到实体类中存到redis中 package com.next.service;import com.alibaba.otter.canal.protocol.CanalEntry; import com.next.dao.TrainNumberMapper; import com.next.model.TrainNumber; import com.next.model.TrainSeat; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service;import javax.annotation.Resource; import java.util.List;Service Slf4j public class TrainSeatService {Resourceprivate TrainNumberMapper trainNumberMapper;Resourceprivate TrainCacheService trainCacheService;//处理座位canal通过监听座位库拿到改变的数据放到实体类中public void handle(ListCanalEntry.Column columns, CanalEntry.EventType eventType) {if (eventType ! CanalEntry.EventType.UPDATE) {log.info(not update,no need care);return;}TrainSeat trainSeat new TrainSeat();boolean isStatusUpdated false;for (CanalEntry.Column column : columns) {//票的状态改变了才做下面的操作if (column.getName().equals(status)) {trainSeat.setStatus(Integer.parseInt(column.getValue()));if (column.getUpdated()) {isStatusUpdated true;} else {break;}} else if (column.getName().equals(id)) {trainSeat.setId(Long.parseLong(column.getValue()));} else if (column.getName().equals(carriage_number)) {trainSeat.setCarriageNumber(Integer.parseInt(column.getValue()));} else if (column.getName().equals(row_number)) {trainSeat.setRowNumber(Integer.parseInt(column.getValue()));} else if (column.getName().equals(seat_number)) {trainSeat.setSeatNumber(Integer.parseInt(column.getValue()));} else if (column.getName().equals(train_number_id)) {trainSeat.setTrainNumberId(Integer.parseInt(column.getValue()));} else if (column.getName().equals(ticket)) {trainSeat.setTicket(column.getValue());} else if (column.getName().equals(from_station_id)) {trainSeat.setFromStationId(Integer.parseInt(column.getValue()));} else if (column.getName().equals(to_station_id)) {trainSeat.setToStationId(Integer.parseInt(column.getValue()));}}if (!isStatusUpdated) {log.info(status not update,no need care);}log.info(train seat update,trainSeat:{}, trainSeat);/*** 数据存到redis* 1.指定座位被占hash* cacheKey:车次_日期 D386_20231001* field: carriage_row_seat_fromStationId_toStationId* value: 0-空闲 1-占座** 2.每个座位详情剩余的座位数* cacheKey: 车次_日期_count D386_20231001_count* field: fromStationId_toStationId* value: 实际座位数**/TrainNumber trainNumber trainNumberMapper.selectByPrimaryKey(trainSeat.getTrainNumberId());//放票if (trainSeat.getStatus() 1) {trainCacheService.hset(trainNumber.getName() _ trainSeat.getTicket(),trainSeat.getCarriageNumber() _ trainSeat.getRowNumber() _ trainSeat.getSeatNumber() _ trainSeat.getFromStationId() _ trainSeat.getToStationId(),0);trainCacheService.hincr(trainNumber.getName() _ trainSeat.getTicket() _count,trainSeat.getFromStationId() _ trainSeat.getToStationId(),1l);log.info(seat1,trainNumber:{},trainSeat:{}, trainNumber, trainSeat);//占票} else if (trainSeat.getStatus() 2) {trainCacheService.hset(trainNumber.getName() _ trainSeat.getTicket(),trainSeat.getCarriageNumber() _ trainSeat.getRowNumber() _ trainSeat.getSeatNumber() _ trainSeat.getFromStationId() _ trainSeat.getToStationId(),1);trainCacheService.hincr(trainNumber.getName() _ trainSeat.getTicket() _count,trainSeat.getFromStationId() _ trainSeat.getToStationId(),-1l);log.info(seat-1,trainNumber:{},trainSeat:{}, trainNumber, trainSeat);} else {log.info(status update not 1 or 2,no need care);}}} 参考文档canal使用说明文档
http://www.zqtcl.cn/news/114153/

相关文章:

  • dedecms织梦搬家公司网站模板贵阳国家经济技术开发区门户网站
  • 网站架构设计师网络工程师的就业前景
  • 网站建设所需人员世界各国o2o响应式网站
  • 成都网站设计最加科技企业宣传片观后感
  • 人社门户网站建设方案非官方网站建设
  • 深圳系统网站开发做家具定制的设计网站
  • 网站制作学费多少钱网络推广的常用方法
  • 个人作品网站模板百度上做网站需要钱吗
  • 苏州网站建设行业研究思路 网站建设
  • 金泉网做网站找谁网站的结构布局
  • 网站开发摊销年限柳州网站建设哪家
  • 佛山市和城乡建设局网站首页武建安装公司新闻
  • 如何宣传商务网站网页制作与设计自考
  • 在国内的服务器上建设国外网站响应式单页网站模板
  • 平湖市住房建设局网站国外代理ip
  • 铁路建设监理网站地推项目发布平台
  • 我的世界做指令的网站网站如何在推广
  • 过年做那个网站致富盘锦网站建设vhkeji
  • 网站semseo先做哪个关键词投放
  • 药品招商网站大全南阳做网站公司电话
  • 优秀手机网站大学生创新产品设计作品
  • 备案期间关闭网站宝应人才网
  • 响应式网站一般做几个版本官网+wordpress
  • 太原网站建设方案服务佛山市建设工程有限公司
  • 智能网站建设平台php mysql 网站源码
  • 夏天做那些网站能致富百度关键词价格怎么查询
  • 厦门微信网站专业从事网站开发公司
  • 网站标题的写法湖南如何做网络营销
  • 设计做兼职的网站求推荐医院英文网站建设
  • 有没得办法可以查询一个网站有没得做竞价呀ai可以用来做网站吗