长春网站建设首选网诚传媒_,网站落地页是什么意思,网站建设设计模板,在床上做很黄很暴力网站前言
之前有发过一篇文章聊聊如何利用redis实现多级缓存同步。有个读者就给我留言说#xff0c;因为他项目的redis版本不是6.0版本#xff0c;因此他使用我文章介绍通过MQ来实现本地缓存同步#xff0c;他的同步流程大概如下图 他原来的业务流程是每天凌晨开启定时器去爬取…前言
之前有发过一篇文章聊聊如何利用redis实现多级缓存同步。有个读者就给我留言说因为他项目的redis版本不是6.0版本因此他使用我文章介绍通过MQ来实现本地缓存同步他的同步流程大概如下图 他原来的业务流程是每天凌晨开启定时器去爬取第三方的数据并持久化到redis后边因为redis发生过宕机事故他碰巧看了我文章就觉得可以用使用多级缓存的策略用来做个兜底。他的业务流程就如上图即每天凌晨开启定时器去爬取第三方数据持久化到redis和其中一台服务的本地缓存然后将爬取到的业务数据发送到kafka其他业务服务通过订阅kafka将业务数据保存到本地缓存。
他改造完某天突然发现在集群环境中只要其中一台服务消费了kafka数据其他就消费不到。今天就借这个话题来聊聊集群环境中本地缓存如何进行同步
前置知识
kafka消费topic-partitions模式分为subscribe模式和assign模式。subscribe模式需要指定group.id,该模式会为consumer自动分配partition,且同一个group.id下的不同consumer不会消费同样的分区。assign模式需要为consumer手动、显示的指定需要消费的topic-partitions不受group.id限制相当与指定的group.id无效。通俗一点讲就是assign模式下所有消费者都可以订阅指定分区
我们要通过消息队列实现本地缓存同步本质上就是需要利用消息队列提供广播能力而kafka默认不具备。不过我们可以根据kafka提供的消费模式进行定制从而是kafka也具备广播能力
集群本地缓存同步方案
方案一利用MQ广播能力
因为读者项目是使用kafka且项目是使用spring-kafka我们也就以此为例 1、subscribe模式 通过前置知识我们了解到在subscribe模式下同一个group.id下的不同consumer不会消费同样的分区这就意味我们可以通过指定不同group.id来消费同样分区达到广播的效果
那如何在同个集群服务实现不同的group.id?
此时Spring EL 表达式就派上用场了我们通过 Spring EL 表达式在每个消费者分组的名字上配合 UUID 生成其后缀。这样就能保证每个项目启动的消费者分组不同从而达到广播消费的目的
示例 KafkaListener(topics ${userCache.topic},groupId ${userCache.topic}_group_ #{T(java.util.UUID).randomUUID()}))public void receive(Acknowledgment ack, String data){System.out.println(String.format(serverPort:【%s】,接收到数据【%s】,serverPort,data));ack.acknowledge();}如果我们决定UUID不直观我们也可以使用IP作为标识只要能保证同个集群服务的group.id是唯一即可
不过如果要改成ip我们得做一定的改造。改造步骤如下
a、 获取ip地址信息并放入environment
public class ServerAddrEnvironmentPostProcessor implements EnvironmentPostProcessor{private String SERVER_ADDRESS server.addr;OverrideSneakyThrowspublic void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {MutablePropertySources propertySources environment.getPropertySources();MapString, Object source new HashMap();String serverAddr InetAddress.getLocalHost().getHostAddress();source.put(SERVER_ADDRESS,serverAddr);MapPropertySource mapPropertySource new MapPropertySource(serverAddrProperties,source);propertySources.addFirst(mapPropertySource);}}
b、 配置spi
在src/main/resource目录下配置META-INF/spring.factories配置内容如下
org.springframework.boot.env.EnvironmentPostProcessor\
com.github.lybgeek.comsumer.ip.ServerAddrEnvironmentPostProcessorc、 KafkaListener配置如下内容 KafkaListener(topics ${userCache.topic},groupId ${userCache.topic}_group_ ${server.addr} _${server.port})小结
该方式的实现优点是比较简单但如果需要对服务进行运维监控统计那就不怎么友好了虽然指定IP会比随机UUID好点但如果是容器化部署每次部署其IP也是会变化这样跟随机指定UUID差别也不大了。其次如果是使用云产品比如阿里云对comsume group是有数量上限且消费者组需要提前创建这种情况使用该方案就不是很合适了 assign模式 通过assign模式手动消费对应的分区
示例 KafkaListener(topicPartitions {TopicPartition(topic ${userCache.topic}, partitions 0)})public void receive(Acknowledgment ack, ConsumerRecord record){System.out.println(String.format(serverPort:【%s】,接收到数据【%s】,serverPort,record));ack.acknowledge();}
小结
该方式实现也是很简单如果我们不需要动态创建新的分区用该方案实现广播会是一个不错的选择。不过该方式的缺点很明显因为是手动指定分区当该分区有问题也挺麻烦的
方案二通过定时器触发
该方案主要基于读者目前的同步进行改造改造后如下图 核心就是根据读者业务的特性因为他是定时每天晚上同步爬取那就意味着他这个数据至少在当天基本不变就可以让集群里的服务都定时执行此时仅需将xxl-job的调度策略改成分片广播就行这样就可以持久化到redis的同时也持久化到本地缓存
小结
该方案改动量比较小有个小缺点就是因为集群内所有服务都执行调度这样就会使redis重复持久化不过问题也不大就是好。最后读者选择该方案
总结
本文主要阐述集群环境中本地缓存如何进行同步之前还有读者问我说使用了多级缓存数据一致性要如何保证以前我可能会从技术角度来回答比如你可以延迟双删或者如果你是mysql你可以使用canalmq更甚者你可以使用分布式锁来保证。但现在我更多从业务角度来思考这件事情你都考虑使用缓存是不是意味着你在业务上是可以容忍一定不一致性既然可以容忍是不是最终可以通过一些补偿方案来解决这个不一致性
没有完美的方案你此时感觉的完美方案可能是当时在那个业务场景下做了一个贴合业务的权衡
demo链接
https://github.com/lyb-geek/springboot-learning/tree/master/springboot-kafka-broadcast