网站侧边栏怎么做,什么网站可以做医疗设备的,查询关键词密度网站的网址有哪些,网站建设 会计分录1.引言
业务程序经常会通过各式各样的缓存来提升用户的访问速度。
由于存在缓存#xff0c;在一些实时性要求较高的场景中#xff0c;需要在数据变更的同时将数据缓存进行更新或删除。
如果数据本身由其他业务部门提供#xff0c;就无法在写入的同时做缓存的一致性处理。…1.引言
业务程序经常会通过各式各样的缓存来提升用户的访问速度。
由于存在缓存在一些实时性要求较高的场景中需要在数据变更的同时将数据缓存进行更新或删除。
如果数据本身由其他业务部门提供就无法在写入的同时做缓存的一致性处理。
此时可以通过其他业务部门暴露数据变更通知来感知到数据变化从而保证数据的更新及时性。
TiCDC 是一款 TiDB 增量数据同步工具通过拉取上游 TiKV 的数据变更日志TiCDC 可以将数据解析为有序的行级变更数据输出到下游。
因此可以通过 TiCDC 将数据变更通知暴露给业务程序来让业务程序做及时的对应处理逻辑。
本文以一张用户表的数据变更为例来展示 Java 服务端接收一条 TiCDC Canal-JSON 的消息变更解析数据并转发给对应的业务处理程序的流程。 之前写类似的程序时网上搜索到的案例还是比较少的本文仅抛砖引玉欢迎各位大佬批评指正 2. 代码思路
1通过 kafka 消息获取 CDC 消息
2解析 CDC 消息判断其数据变更类型执行对应的处理逻辑
3. 代码实现
3.1 代码结构
cdc-demo
└─ src└─ main└─ java└─ com.example.demo├─ constants│ └─ CdcConstants.java├─ dto│ ├─ CdcMessage.java│ └─ User.java├─ job│ ├─ CdcJob.java│ └─ UserCdcJob.java└─ service├─ impl│ └─ UserServiceImpl.java└─ CdcService.java3.2 CDC 常量类
public class CdcConstants {
public enum MessageType {/*** 插入操作*/INSERT,
/*** 更新操作*/UPDATE,
/*** 删除操作*/DELETE;}
}3.3 实体类
3.3.1 用户实体类
Getter
Setter
public class User {
/*** 用户id*/private Long id;
/*** 用户名*/private String name;
/*** 年龄*/private Integer age;
}3.3.2 CDC 消息实体类
Getter
Setter
public class CdcMessageT {
/*** 数据集合*/private ListT data;
/*** 数据库名称*/private String database;
/*** 是否为DDL语句isDdl*/private boolean isDdl;
/*** 表结构的类型字段值为字段类型如varchar*/private T mysqlType;
/*** UPDATE类型下的旧数据未变更字段无数据*/private ListT oldData;
/*** sql语句*/private String sql;
/*** 值为int类型*/private T sqlType;
/*** 数据表名*/private String table;
/*** 新增INSERT、更新UPDATE、删除DELETE、删除表ERASE等*/private String type;
}3.4 任务类
3.4.1 CDC 任务基类
Slf4j
public class CdcJobT {
protected CdcServiceT cdcService;
/*** 处理消息** param record 消息记录* param ack 消息处理标识*/public void handleMessage(ConsumerRecordString, String record, Acknowledgment ack) {String recordString String.format(topic:%s,partition:%s,offset:%s,value:%s,record.topic(),record.partition(),record.offset(),record.value());log.info(数据更新开始处理消息 recordString);try {boolean processResult process(record);String processResultString processResult ? 成功 : 无更新;log.info(数据更新处理结束处理结果 processResultString);} catch (Exception e) {log.error(数据更新报错, e);} finally {// 手动提交偏移量ack.acknowledge();}}
/*** 处理数据** param record kafka消费记录* return 处理结果*/public boolean process(ConsumerRecordString, String record) {String bizName this.getClass().getSimpleName();// 服务为初始化报错if (null cdcService) {throw new IllegalStateException(服务未初始化);}
// 解析消息CdcMessageT cdcMessage JSON.parseObject(record.value(), new TypeReferenceCdcMessageT() {});
// 跳过DDLif (cdcMessage.isDdl()) {log.info(bizName, DDL变更无需处理);return false;}// 处理结果初始化boolean result false;// 服务层处理数据ListT dataList cdcMessage.getData();if (CdcConstants.MessageType.INSERT.name().equals(cdcMessage.getType())) {result cdcService.insert(dataList);} else if (CdcConstants.MessageType.UPDATE.name().equals(cdcMessage.getType())) {result cdcService.update(cdcMessage.getOldData(), dataList);} else if (CdcConstants.MessageType.DELETE.name().equals(cdcMessage.getType())) {result cdcService.delete(dataList);} else {log.warn(bizName, 不处理该消息消息类型 cdcMessage.getType());}return result;}
}3.4.2 用户表 CDC 消费任务类
Component
public class UserCdcJob extends CdcJobUser {
public UserCdcJob(UserServiceImpl userService) {this.cdcService userService;}
/*** 消费CDC消息并进行处理** param record 消息记录* param ack 消息处理标识*/KafkaListener(id UserCdcJob, groupId ${user-cdc.group},topics {${user-cdc.topic}}, containerFactory cdcKafkaListenerFactory)public void consumer(ConsumerRecordString, String record, Acknowledgment ack) {handleMessage(record, ack);}
}3.5 服务类
3.5.1 CDC 消息处理服务接口
public interface CdcServiceT {
/*** 插入数据** param data 数据* return 插入结果*/boolean insert(ListT data);
/*** 更新数据** param oldData 更新前数据* param newData 更新后数据* return 更新结果*/boolean update(ListT oldData, ListT newData);
/*** 删除数据** param data 数据* return 删除数据*/boolean delete(ListT data);
}3.5.2 用户服务实现类
Service
public class UserServiceImpl implements CdcServiceUser {
Overridepublic boolean insert(ListUser data) {// TODOreturn false;}
Overridepublic boolean update(ListUser oldData, ListUser newData) {// TODOreturn false;}
Overridepublic boolean delete(ListUser data) {// TODOreturn false;}
}4.参考文档
TiCDC 简介https://docs.pingcap.com/zh/tidb/stable/ticdc-overview
TiCDC Canal-JSON 协议https://docs.pingcap.com/zh/tidb/stable/ticdc-canal-json