网站被攻击会影响收录么,微信网站 影楼,制作单网页网站,财务软件免费版目录 一、MQ简介
MQ的作用主要有以下三个方面
二、RocketMQ产品特点
1、RocketMQ介绍
2、RocketMQ特点
三、RocketMQ实战
1、快速搭建RocketMQ服务
2、快速实现消息收发
1. 命令行快速实现消息收发
2. 搭建Maven客户端项目
3、搭建RocketMQ可视化管理服务
4、升级分…目录 一、MQ简介
MQ的作用主要有以下三个方面
二、RocketMQ产品特点
1、RocketMQ介绍
2、RocketMQ特点
三、RocketMQ实战
1、快速搭建RocketMQ服务
2、快速实现消息收发
1. 命令行快速实现消息收发
2. 搭建Maven客户端项目
3、搭建RocketMQ可视化管理服务
4、升级分布式集群
5、升级高可用集群 关于Dledger集群的一些补充 一、MQ简介
MQMessageQueue消息队列。是在互联网中使用非常广泛的一系列服务中间件。 这个词可以分两个部分来看一是Message消息。消息是在不同进程之间传递的数据。这些进程可以部署在同一台机器上也可以分布在不同机器上。二是Queue队列。队列原意是指一种具有FIFO(先进先出)特性的数据结构是用来缓存数据的。对于消息中间件产品来说能不能保证FIFO特性尚值得考量。但是所有消息队列都是需要具备存储消息让消息排队的能力。 广义上来说只要能够实现消息跨进程传输以及队列数据缓存就可以称之为消息队列。例如我们常用的QQ、微信、阿里旺旺等就都具备了这样的功能。只不过它们对接的使用对象是人而我们这里讨论的MQ产品需要对接的使用对象是应用程序。 MQ的作用主要有以下三个方面
异步
例子快递员发快递直接到客户家效率会很低。引入菜鸟驿站后快递员只需要把快递放到菜鸟驿站就可以继续发其它快递去了。客户再按自己的时间安排去菜鸟驿站取快递。
作用异步能提高系统的响应速度、吞吐量。 解耦
例子《Thinking in JAVA》很经典但是都是英文我们看不懂所以需要编辑社将文章翻译成其它语言这样就可以完成英语与其它语言的交流。
作用
1、服务之间进行解耦才可以减少服务之间的影响。提高系统整体的稳定性以及可扩展性。
2、另外解耦后可以实现数据分发。生产者发送一个消息后可以由一个或者多个消费者进行消费并且消费者的增加或者减少对生产者没有影响。 削峰
例子长江每年都会涨水但是下游出水口的速度是基本稳定的所以会涨水。引入三峡大坝后可以把水储存起来下游慢慢排水。
作用以稳定的系统资源应对突发的流量冲击。 二、RocketMQ产品特点
1、RocketMQ介绍
RocketMQ是阿里巴巴开源的一个消息中间件在阿里内部历经了双十一等很多高并发场景的考验能够处理亿万级别的消息。2016年开源后捐赠给Apache现在是Apache的一个顶级项目。 早期阿里使用ActiveMQ但是当消息开始逐渐增多后ActiveMQ的IO性能很快达到了瓶颈。于是阿里开始关注Kafka。但是Kafka是针对日志收集场景设计的它的高级功能并不是很贴合阿里的业务场景。尤其当它的Topic过多时由于Partition文件也会过多这就会加大文件索引的耗时会严重影响IO性能。于是阿里才决定自研中间件最早叫做MetaQ后来改名成为RocketMQ。最早它所希望解决的最大问题就是多Topic下的IO性能压力。但是产品在阿里内部的不断改进RocketMQ开始体现出一些不一样的优势。 2、RocketMQ特点
当今互联网MQ产品众多其中影响力和使用范围最大的当数Apache Kafka、RabbitMQ、Apache RocketMQ以及Apache Plusar。这几大产品虽然都是典型的MQ产品但是由于设计和实现上的一些差异造成它们适合于不同的细分场景。 优点 缺点 适合场景 Apache Kafka 吞吐量非常大性能非常好集群高可用 会有丢数据的可能功能比较单一 日志分析、大数据采集 RabbitMQ 消息可靠性高功能全面 erlang语言不好定制吞吐量比较低 企业内部小规模服务调用 Apache Pulsar 基于Bookeeper构建消息可靠性非常高 周边生态还有差距目前使用的公司比较少 企业内部大规模服务调用 Apache RocketMQ 高吞吐、高性能、高可用功能全面。客户端协议丰富。使用java语言开发方便定制 服务加载比较慢 几乎全场景特别适合金融场景
其中RocketMQ孵化自阿里巴巴。历经阿里多年双十一的严格考验RocketMQ可以说是从全世界最严苛的高并发场景中摸爬滚打出来的过硬产品也是少数几个在金融场景比较适用的MQ产品。从横向对比来看RocketMQ与Kafka和RabbitMQ相比。RocketMQ的消息吞吐量虽然和Kafka相比还是稍有差距但是却比RabbitMQ高很多。在阿里内部RocketMQ集群每天处理的请求数超过5万亿次支持的核心应用超过3000个。而RocketMQ最大的优势就是它天生就为金融互联网而生。它的消息可靠性相比Kafka也有了很大的提升而消息吞吐量相比RabbitMQ也有很大的提升。另外RocketMQ的高级功能也越来越全面广播消费、延迟队列、死信队列等等高级功能一应俱全甚至某些业务功能比如事务消息已经呈现出领先潮流的趋势。 三、RocketMQ实战
1、快速搭建RocketMQ服务
RocketMQ的官网地址 http://rocketmq.apache.org 。在下载页面可以获取RocketMQ的源码包以及运行包。下载页面地址https://rocketmq.apache.org/download。 当前最新的版本是5.x这是一个着眼于云原生的新版本给 RocketMQ 带来了非常多很亮眼的新特性。但是目前来看企业中用得还比较少。因此这里采用的还是更为稳定的4.9.5版本。
注在2020年下半年RocketMQ新推出了5.0的大版本这对于RocketMQ来说是一个里程碑式的大版本。在这个大版本中RocketMQ对整体功能做了一次大的升级。增加了很多非常有用的新特性也对已有功能重新做了升级。
比如在具体功能方面在4.x版本中对于定时消息只能设定几个固定的延迟级别而5.0版本中已经可以指定具体的发送时间了。在客户端语言方面4.x版本RocketMQ原生只支持基于Netty框架的Java客户端。而在5.0版本中增加了对Grpc协议的支持这基本上就解除了对客户端语言的限制。在服务端架构方面4.x版本只支持固定角色的普通集群和可以动态切换角色的Dledger集群而在5.0版本中增加了Dledger Controller混合集群模式即可以混合使用Dledger的集群机制以及 Broker 本地的文件管理机制。
但是功能强大同时也意味着问题会很多。所以目前来看企业中直接用新版本的还比较少。小部分使用新版本的企业也大都是使用内部的改造优化版本。 运行只需要下载Binary运行版本就可以了。 当然源码包也建议下载下来后续会进行解读。运行包下载下来后就可以直接解压上传到服务器上。我们这里会上传到/app/rocketmq目录。解压后几个重要的目录如下: 接下来RocketMQ建议的运行环境需要至少12G的内存这是生产环境比较理想的资源配置。但是学习阶段如果服务器没有这么大的内存空间那么就需要做一下调整。进入bin目录对其中的runserver.sh和runbroker.sh两个脚本进行一下修改。
使用vi runserver.sh指令编辑这个脚本找到下面的一行配置调整Java进程的内存大小。
JAVA_OPT${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize128m -XX:MaxMetaspaceSize320m 接下来同样调整runbroker.sh中的内存大小。
JAVA_OPT${JAVA_OPT} -server -Xms8g -Xmx8g
修改为
JAVA_OPT${JAVA_OPT} -server -Xms1g -Xmx1g 生产环境不建议调整。
调整完成后就可以启动RocketMQ服务了。 RocketMQ服务基于Java开发所以需要提前安装JDK。JDK建议采用1.8版本即可。
RocketMQ的后端服务分为nameserver和broker两个服务。接下来我们先将这两个服务启动起来。 第一步启动nameserver服务。
cd /app/rocketmq/rocketmq-all-4.9.5-bin-release
nohup bin/mqnamesrv 指令执行后会生成一个nohup.out的日志文件。在这个日志文件里如果看到下面这一条关键日志就表示nameserver服务启动成功了。
less nohup.out 接下来可以通过jsp指令进行验证。使用jps指令后可以看到有一个NamesrvStartup的进程运行也表示nameserver服务启动完成。 第二步启动broker服务。
启动broker服务之前要做一个小小的配置。进入RocketMQ安装目录下的conf目录修改broker.conf文件在文件最后面加入一个配置
autoCreateTopicEnabletrue 这个选项是为了便于进行后续实验。它的作用是允许 broker 端自动创建新的 Topic。
另外如果你的服务器配置了多张网卡比如阿里云腾讯云这样的云服务器它们通常有内网网卡和外网网卡两张网卡那么需要增加配置brokerIP1属性指向服务器的外网IP 地址这样才能确保从其它服务器上访问到RocketMQ 服务。 然后也可以用之前的方式启动broker服务。启动broker服务的指令是mqbroker
cd /app/rocketmq/rocketmq-all-4.9.5-bin-release
nohup bin/mqbroker 报错信息
Error: VM option UseG1GC is experimental and must be enabled via -XX:UnlockExperimentalWMOptions.
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
vi bin/runbroker.sh 使用修改配置 启动完成后同样检查nohup.out日志文件有如下一条关键日志就表示broker服务启动正常了。
The broker[xxxxx] boot success. serializeTypeJSON 注1、在实际服务部署时通常会将RocketMQ的部署地址添加到环境变量当中。例如使用vi ~/.bash_profile指令添加以下内容。
export ROCKETMQ_HOME/app/rocketmq/rocketmq-all-4.9.5-bin-releasePATH$ROCKETMQ_HOME/bin:$PATHexport PATH
这样就不必每次进入RocketMQ的安装目录了。直接可以使用mqnamesrv 和mqbroker指令。
2、停止RocketMQ服务可以通过mqshutdown指令进行
mqshutdown namesrv # 关闭nameserver服务
mqshutdown broker # 关闭broker服务 同样使用jps指令可以检查服务的启动状态。使用jsp指令后可以看到一个名为BrokerStartup的进程则表示broker服务启动完成。 2、快速实现消息收发
RocketMQ后端服务启动完成后就可以启动客户端的消息生产者和消息消费者进行消息转发了。接下来我们会先通过RocketMQ提供的命令行工具快速体验一下RocketMQ消息收发的功能。然后再动手搭建一个Maven项目在项目中使用RocketMQ进行消息收发。 1. 命令行快速实现消息收发
第一步需要配置一个环境变量NAMESRV_ADDR只想我们之前启动的nameserver服务。
通过vi ~/.bash_profile添加以下配置。然后使用source ~/.bash_profile让配置生效。
export NAMESRV_ADDRlocalhost:9876 第二步通过指令启动RocketMQ的消息生产者发送消息。
tools.sh org.apache.rocketmq.example.quickstart.Producer
如果连接报错尝试重启虚拟机即可解决
这个指令会默认往RocketMQ中发送1000条消息。在命令行窗口可以看到发送消息的日志 这部分日志中并没有打印出发送了什么消息。上面SendResult开头部分是消息发送到Broker后的结果。最后两行日志表示消息生产者发完消息后服务正常关闭了。 第三步可以启动消息消费者接收之前发送的消息
tools.sh org.apache.rocketmq.example.quickstart.Consumer
消费者启动完成后可以看到消费到的消息
......
ConsumeMessageThread_please_rename_unique_group_name_4_12 Receive New Messages:
[MessageExt [brokerNameworker1, queueId0, storeSize192, queueOffset221, sysFlag0,
bornTimestamp1694963761544, bornHost/192.168.189.131:54568, storeTimestamp1694963761545,
storeHost/192.168.189.131:10911, msgIdC0A8BD8300002A9F0000000000029692,
commitLogOffset169618, bodyCRC1080137936, reconsumeTimes0, preparedTransactionOffset0,
toString()Message{topicTopicTest, flag0, properties{MIN_OFFSET0, MAX_OFFSET250,
CONSUME_START_TIME1694965156155, UNIQ_KEY7F000001105F1B6D35865763D9880374,
CLUSTERDefaultCluster, TAGSTagA}, body[72, 101, 108, 108, 111, 32, 82, 111, 99, 107,
101, 116, 77, 81, 32, 56, 56, 52], transactionIdnull}]]
每一条这样的日志信息就表示消费者接收到了一条消息。
这个Consumer消费者的指令并不会主动结束它会继续挂起等待消费新的消息。我们可以使用CTRLC停止该进程。
注在RocketMQ提供的这个简单示例中并没有打印出传递的消息内容而是打印出了消息相关的很多重要的属性。
其中有几个比较重要的属性 brokerIdbrokerNamequeueIdmsgIdtopiccluster。 2. 搭建Maven客户端项目
之前的步骤实际上是在服务器上快速验证RocketMQ的服务状态接下来我们动手搭建一个RocketMQ的客户端应用在实际应用中集成使用RocketMQ。
第一步创建一个标准的maven项目在pom.xml中引入以下核心依赖
dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion4.9.5/version
/dependency 第二步就可以直接创建一个简单的消息生产者
public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {//初始化一个消息生产者DefaultMQProducer producer new DefaultMQProducer(please_rename_unique_group_name);// 指定nameserver地址producer.setNamesrvAddr(192.168.189.131:9876);// 启动消息生产者服务producer.start();for (int i 0; i 2; i) {try {// 创建消息。消息由Topic,Tag和body三个属性组成其中Body就是消息内容Message msg new Message(TopicTest,TagA,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET));//发送消息获取发送结果SendResult sendResult producer.send(msg);System.out.printf(%s%n, sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}//消息发送完后停止消息生产者服务。producer.shutdown();}
}
运行其中的main方法就会往RocketMQ中发送两条消息。在这个实现过程中需要注意一下的是对于生产者需要指定对应的nameserver服务的地址这个地址需要指向你自己的服务器。 第三步创建一个消息消费者接收RocketMQ中的消息。
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {//构建一个消息消费者DefaultMQPushConsumer consumer new DefaultMQPushConsumer(please_rename_unique_group_name_4);//指定nameserver地址consumer.setNamesrvAddr(192.168.189.131:9876);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// 订阅一个感兴趣的话题这个话题需要与消息的topic一致consumer.subscribe(TopicTest, *);// 注册一个消息回调函数消费到消息后就会触发回调。consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {msgs.forEach(messageExt - {try {System.out.println(收到消息:new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {}});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者服务consumer.start();System.out.print(Consumer Started);}
}
运行其中的main方法后就可以启动一个RocketMQ消费者接收之前发到RocketMQ上的消息并将消息内容打印出来。在这个实现过程中需要重点关注的有两点。一是对于消费者同样需要指定nameserver的地址。二是消费者需要在RocketMQ中订阅具体的Topic只有发送到这个Topic上的消息才会被这个消费者接收到。
这样通过几个简单的步骤我们就完成了RocketMQ的应用集成。从这个过程中可以看到RocketMQ的使用是比较简单的。但是这并不意味着这几个简单的步骤就足够搭建一个生产级别的RocketMQ服务。接下来我们会一步步把我们这个简单的RocketMQ服务往一个生产级别的服务集群推进。 3、搭建RocketMQ可视化管理服务
在之前的简单实验中RocketMQ都是以后台服务的方式在运行我们并不很清楚RocketMQ是如何运行的。RocketMQ的社区就提供了一个图形化的管理控制台Dashboard可以用可视化的方式直接观测并管理RocketMQ的运行过程。
Dashboard服务并不在RocketMQ的运行包中需要到RocketMQ的官网下载页面单独下载。
下载 | RocketMQ 这里只提供了源码并没有提供直接运行的jar包。将源码下载下来后需要解压并进入对应的目录使用maven进行编译。(需要提前安装maven客户端)
mvn clean package -Dmaven.test.skiptrue
编译完成后在源码的target目录下会生成可运行的jar包rocketmq-dashboard-1.0.1-SNAPSHOT.jar。接下来可以将这个jar包上传到服务器上。我们上传到/app/rocketmq/rocketmq-dashboard目录下
mkdir rocketmq-dashboard
touch application.yml
接下来我们需要在jar包所在的目录下创建一个application.yml配置文件在配置文件中做如下配置
//加上application.yml配置启动失败去掉却启动成功了
rocketmq: config: namesrvAddrs: - 192.168.189.131:9876
主要是要指定nameserver的地址。
注关于这个配置文件中更多的配置选项可以参考一下dashboard源码当中的application.yml配置文件。 接下来就可以通过java指令执行这个jar包启动管理控制台服务。
java -jar rocketmq-dashboard-1.0.0.jar 1dashboard.log 21 应用启动完成后会在服务器上搭建起一个web服务我们就可以通过访问http://192.168.189.131:8080/#/查看到管理页面。 这个管理控制台的功能非常全面。驾驶舱页面展示RocketMQ近期的运行情况。运维页面主要是管理nameserver服务。集群页面主要管理RocketMQ的broker服务。很多信息都一目了然。在之后的过程中我们也会逐渐了解DashBoard管理页面中更多的细节。 4、升级分布式集群
之前我们用一台Linux服务器快速搭建起了一整套RocketMQ的服务。但是很明显这样搭建的服务是无法放到生产环境上去用的。一旦nameserver服务或者broker服务出现了问题整个RocketMQ就无法正常工作。而且更严重的是如果服务器出现了问题比如磁盘坏了那么存储在磁盘上的数据就会丢失。这时RocketMQ暂存到磁盘上的消息也会跟着丢失这个问题就非常严重了。因此我们需要搭建一个分布式的RocketMQ服务集群来防止单点故障问题。
RocketMQ的分布式集群基于主从架构搭建。在多个服务器组成的集群中指定一部分节点作为Master节点负责响应客户端的请求。指令另一部分节点作为Slave节点负责备份Master节点上的数据这样当Master节点出现故障时在Slave节点上可以保留有数据备份至少保证数据不会丢失。
整个集群方案如下图所示 接下来我们准备三台相同的Linux服务器搭建一下RocketMQ的分布式集群。为了更清晰的描述这三台服务器上的操作我们给每个服务器指定一个机器名。 打开 /etc/hosts 文件以编辑 sudo nano /etc/hosts 在文件中找到一个空白行然后添加以下内容将IP地址和机器名替换为您要设置的实际值 your_ip_address your_desired_hostname #每台机器都需要加这个机器名
192.168.189.131 worker1
192.168.189.132 worker2
192.168.189.133 worker3 ssh-keygen 是一个用于生成 SSH 密钥对的命令行工具它用于加密和验证 SSH 连接。SSH 密钥对通常包括一个私钥和一个公钥私钥用于身份验证而公钥用于将其添加到远程服务器以允许访问 ssh-keygen #生成 SSH 密钥命令 可以将公钥id_rsa.pub 文件的内容添加到远程服务器上以允许通过 SSH 密钥进行身份验证。通常您可以使用 ssh-copy-id 命令将公钥添加到远程服务器例如 ssh-copy-id userremote_host 在第一台机器worker1生成密钥ssh-keygen并执行以下命名将公钥添加到远程服务器worker2、worker3无需密码就可以访问
ssh-copy-id worker2
ssh-copy-id worker3
为了便于观察我们这次搭建一个2主2从的RocketMQ集群并将主节点和节点都分别部署在不同的服务器上。预备的集群规划情况如下 机器名 nameServer服务部署 broker服务部署 worker1 nameServer 不需要 worker2 nameServer broker-abroker-b-s worker3 nameServer broker-a-sbroker-b 把rocketmq 从 worker1传到worker2和worker3执行以下命令root是登录用户
scp -r rocketmq-all-4.9.5-bin-release/ rootworker2:/app/rocketmq/
scp -r rocketmq-all-4.9.5-bin-release/ rootworker3:/app/rocketmq/● rocketmq-all-4.9.5-bin-release要传输的本地文件的路径。● root远程虚拟机上的用户名。● worker3远程虚拟机的主机名或IP地址。● /app/rocketmq/远程虚拟机上文件的目标路径
第一步部署nameServer服务。
nameServer服务不需要做特别的配置按照之前的步骤在三台服务器上都分别部署nameServer服务即可。
cd /app/rocketmq/rocketmq-all-4.9.5-bin-release
nohup bin/mqnamesrv 第二步对Broker服务进行集群配置。
这里需要修改RocketMQ的配置文件对broker服务做一些集群相关的参数部署。这些配置文件并不需要我们手动进行创建在RocketMQ运行包的conf目录下提供了多种集群的部署配置文件模板。
2m-noslave: 2主无从的集群参考配置。这种集群存在单点故障。2m-2s-async和2m-2s-sync: 2主2从的集群参考配置。其中async和sync表示主节点与从节点之间是同步同步还是异步同步。关于这两个概念会在后续详细介绍dledger: 具备主从切换功能的高可用集群。集群中的节点会基于Raft协议随机选举出一个Leader其作用类似于Master节点。其它的节点都是follower其作用类似于Slave节点。
我们这次采用2m-2s-async的方式搭建集群需要在worker2和worker3上修改这个文件夹下的配置文件。 配置第一组broker-a服务
在worker2机器上配置broker-a的MASTER服务需要修改conf/2m-2s-async/broker-a.properties。示例配置如下
#所属集群名字名字一样的节点就在同一个集群内
brokerClusterNamerocketmq-cluster
#broker名字名字一样的节点就是一组主从节点。
brokerNamebroker-a
#brokerid,0就表示是Master0的都是表示 Slave
brokerId0
#nameServer地址分号分割
namesrvAddrworker1:9876;worker2:9876;worker3:9876
#是否允许 Broker 自动创建Topic建议线下开启线上关闭
autoCreateTopicEnabletrue
#凌晨4点删除
deleteWhen04
#刷盘时间120小时
fileReservedTime120
#存储路径
storePathRootDir/app/rocketmq/store
storePathCommitLog/app/rocketmq/store/commitlog
storePathConsumeQueue/app/rocketmq/store/consumequeue
storePathIndex/app/rocketmq/store/index
storeCheckpoint/app/rocketmq/store/checkpoint
abortFile/app/rocketmq/store/abort
#Broker 的角色
brokerRoleASYNC_MASTER
flushDiskTypeASYNC_FLUSH
#Broker 对外服务的监听端口主节点和从节点不能使用一样的端口
listenPort10911
这里对几个需要重点关注的属性做下简单介绍
brokerClusterName: 集群名。RocketMQ会将同一个局域网下所有brokerClusterName相同的服务自动组成一个集群这个集群可以作为一个整体对外提供服务brokerName: Broker服务名。同一个RocketMQ集群当中brokerName相同的多个服务会有一套相同的数据副本。同一个RocketMQ集群中是可以将消息分散存储到多个不同的brokerName服务上的。brokerId: RocketMQ中对每个服务的唯一标识。RocketMQ对brokerId定义了一套简单的规则master节点需要固定配置为0负责响应客户端的请求。slave节点配置成其它任意数字负责备份master上的消息。brokerRole: 服务的角色。这个属性有三个可选项ASYNC_MASTERSYNC_MASTER和SLAVE。其中ASYNC_MASTER和SYNC_MASTER表示当前节点是master节点目前暂时不用关心它们的区别。SLAVE则表示从节点。namesrvAddr: nameserver服务的地址。nameserver服务默认占用9876端口。多个nameserver地址用隔开。 接下来在worekr3上配置broker-a的SLAVE服务。需要修改conf/2m-2s-async/broker-a-s.properties。示例配置如下
#所属集群名字名字一样的节点就在同一个集群内
brokerClusterNamerocketmq-cluster
#broker名字名字一样的节点就是一组主从节点。
brokerNamebroker-a
#brokerid,0就表示是Master0的都是表示 Slave
brokerId1
#nameServer地址分号分割
namesrvAddrworker1:9876;worker2:9876;worker3:9876
#是否允许 Broker 自动创建Topic建议线下开启线上关闭
autoCreateTopicEnabletrue
deleteWhen04
fileReservedTime120
#存储路径
storePathRootDir/app/rocketmq/storeSlave
storePathCommitLog/app/rocketmq/storeSlave/commitlog
storePathConsumeQueue/app/rocketmq/storeSlave/consumequeue
storePathIndex/app/rocketmq/storeSlave/index
storeCheckpoint/app/rocketmq/storeSlave/checkpoint
abortFile/app/rocketmq/storeSlave/abort
#Broker 的角色
brokerRoleSLAVE
flushDiskTypeASYNC_FLUSH
#Broker 对外服务的监听端口
listenPort11011
其中关键是brokerClusterName和brokerName两个参数需要与worker2上对应的broker-a.properties配置匹配。brokerId配置0以外的数字。然后brokerRole配置为SLAVE。
这样第一组broker服务就配置好了。 配置第二组borker-b服务
与第一组broker-a服务的配置方式类似在worker3上配置broker-b的MASTER服务。需要修改conf/2m-2s-async/broker-b.properties文件配置示例如下
#所属集群名字名字一样的节点就在同一个集群内
brokerClusterNamerocketmq-cluster
#broker名字名字一样的节点就是一组主从节点。
brokerNamebroker-b
#brokerid,0就表示是Master0的都是表示 Slave
brokerId0
#nameServer地址分号分割
namesrvAddrworker1:9876;worker2:9876;worker3:9876
#是否允许 Broker 自动创建Topic建议线下开启线上关闭
autoCreateTopicEnabletrue
deleteWhen04
fileReservedTime120
#存储路径
storePathRootDir/app/rocketmq/store
storePathCommitLog/app/rocketmq/store/commitlog
storePathConsumeQueue/app/rocketmq/store/consumequeue
storePathIndex/app/rocketmq/store/index
storeCheckpoint/app/rocketmq/store/checkpoint
abortFile/app/rocketmq/store/abort
#Broker 的角色
brokerRoleASYNC_MASTER
flushDiskTypeASYNC_FLUSH
#Broker 对外服务的监听端口
listenPort10911 在worker2上配置broker-b的SLAVE服务。需要修改conf/2m-2s-async/broker-b-s.properties文件配置示例如下
#所属集群名字名字一样的节点就在同一个集群内
brokerClusterNamerocketmq-cluster
#broker名字名字一样的节点就是一组主从节点。
brokerNamebroker-b
#brokerid,0就表示是Master0的都是表示 Slave
brokerId1
#nameServer地址分号分割
namesrvAddrworker1:9876;worker2:9876;worker3:9876
#是否允许 Broker 自动创建Topic建议线下开启线上关闭
autoCreateTopicEnabletrue
deleteWhen04
fileReservedTime120
#存储路径
storePathRootDir/app/rocketmq/storeSlave
storePathCommitLog/app/rocketmq/storeSlave/commitlog
storePathConsumeQueue/app/rocketmq/storeSlave/consumequeue
storePathIndex/app/rocketmq/storeSlave/index
storeCheckpoint/app/rocketmq/storeSlave/checkpoint
abortFile/app/rocketmq/storeSlave/abort
#Broker 的角色
brokerRoleSLAVE
flushDiskTypeASYNC_FLUSH
#Broker 对外服务的监听端口
listenPort11011
这样就完成了2主2从集群的配置。配置过程汇总有几个需要注意的配置项
store开头的一系列配置表示RocketMQ的存盘文件地址。在同一个机器上需要部署多个Broker服务时不同服务的存储目录不能相同。listenPort表示Broker对外提供服务的端口。这个端口默认是10911。在同一个机器上部署多个Broker服务时不同服务占用的端口也不能相同。如果你使用的是多网卡的服务器比如阿里云上的云服务器那么就需要在配置文件中增加配置一个brokerIP1属性指向所在机器的外网网卡地址。 第三步启动Broker服务
集群配置完成后需要启动Broker服务。与之前启动broker服务稍有不同启动时需要增加-c参数指向我们修改的配置文件。
在worker2上启动broker-a的master服务和broker-b的slave服务
cd /app/rocketmq/rocketmq-all-4.9.5-bin-release
nohup bin/mqbroker -c ./conf/2m-2s-async/broker-a.properties
nohup bin/mqbroker -c ./conf/2m-2s-async/broker-b-s.properties 在worker3上启动broker-b的master服务和broker-a的slave服务
cd /app/rocketmq/rocketmq-all-4.9.5-bin-release
nohup bin/mqbroker -c ./conf/2m-2s-async/broker-b.properties
nohup bin/mqbroker -c ./conf/2m-2s-async/broker-a-s.properties 第四步检查集群服务状态
对于服务的启动状态我们依然可以用之前介绍的jps指令以及nohup.out日志文件进行跟踪。不过在RocketMQ的bin目录下也提供了mqadmin指令可以通过命令行的方式管理RocketMQ集群。
例如下面的指令可以查看集群broker集群状态。通过这个指令可以及时了解集群的运行状态。
mqadmin clusterList 注执行这个指令需要在机器上配置了NAMESRV环境变量
mqadmin指令还提供了非常丰富的管理功能。你可以尝试直接使用mqadmin指令就会列出mqadmin支持的所有管理指令。如果对某一个指令不会使用还可以使用mqadmin help 指令查看帮助。 另外之前搭建的dashboard也是集群服务状态的很好的工具。只需要在之前搭建Dashboard时创建的配置文件中增加指定nameserver地址即可。
rocketmq: config: namesrvAddrs: - worker1:9876 - worker2:9876- worker3:9876 启动完成后在集群菜单页就可以看到集群的运行情况 在RocketMQ的这种主从架构的集群下客户端发送的消息会分散保存到broker-a和broker-b两个服务上然后每个服务都配有slave服务可以备份对应master服务上的消息这样就可以防止单点故障造成的消息丢失问题。 在worker3上执行 mqshutdown broker 停止 broker-a-sbroker-b没有发生节点主从切换防止了消息的丢失 通过指令启动RocketMQ的消息生产者发送1000条消息。在worker1机器上执行
tools.sh org.apache.rocketmq.example.quickstart.Producer 可以发现这1000条消息都是broker-a 主节点master接收broker-b从节点slave没有解释牺牲了可用性 5、升级高可用集群
主从架构的RocketMQ集群由于给每个broker服务配置了一个或多个slave备份服务可以保证当broker服务出现问题时broker上的消息不会丢失。但是这种主从架构的集群却也有一个不足的地方那就是不具备服务高可用。
这里所说的服务高可用并不是并不是指整个RocketMQ集群就不能对外提供服务了而是指集群中的消息就不完整了。实际上当RocketMQ集群中的broker宕机后整个集群会自动进行broker状态感知。后续客户端的各种请求依然可以转发到其它正常的broker上。只不过原本保存在当前broker上的消息就无法正常读取了需要等到当前broker服务重启后才能重新被消息消费者读取。
当一个broker上的服务宕机后我们可以从对应的slave服务上找到broker上所有的消息。但是很可惜主从架构中各个服务的角色都是固定了的slave服务虽然拥有全部的数据但是它没办法升级成为master服务去响应客户端的请求依然只是傻傻等待master服务重启后继续做它的数据备份工作。
这时我们自然就希望这个slave服务可以升级成为master服务继续响应客户端的各种请求这样整个集群的消息服务就不会有任何中断。而RocketMQ提供的Dledger集群就是具备角色自动转换功能的高可用集群。 整个集群结构如下图所示 在Dledger集群中就不再单独指定各个broker的服务而是由这些broker服务自行进行选举产生一个Leader角色的服务响应客户端的各种请求。而其它的broker服务就作为Follower角色负责对Leader上的数据进行备份。当然Follower所要负责的事情比主从架构中的SLAVE角色会复杂一点因为这种节点选举是在后端不断进行的它们需要随时做好升级成Leader的准备。
Dledger集群的选举是通过Raft协议进行的Raft协议是一种多数同意机制。也就是每次选举需要有集群中超过半数的节点确认才能形成整个集群的共同决定。同时这也意味着在Dledger集群中只要有超过半数的节点能够正常工作那么整个集群就能正常工作。因此在部署Dledger集群时通常都是部署奇数台服务这样可以让集群的容错性达到最大。 接下来我们就用之前准备的3台服务器搭建一个3个节点的Dledger集群。在这个集群中只需要有2台Broker服务正常运行这个集群就能正常工作。 第一步部署nameserver
这一步和之前部署主从集群没有区别不需要做过多的配置直接在三台服务器上启动nameserver服务即可。
实际上如果你是从上一个主从架构开始搭建起来的话那么nameserver集群都不需要重新启动nameserver会自动感知到broker的变化。 第二步对Broker服务进行集群配置。
对于Dledger集群的配置RocketMQ依然贴心的给出了完整的示例不需要强行记忆。
在conf/dledger目录下RocketMQ默认给出了三个配置文件这三个配置文件可以在单机情况下直接部署成一个具有三个broker服务的Dledger集群我们只需要按照这个配置进行修改即可。
注在RocketMQ运行包的bin/dledger目录下RocketMQ还提供了一个fast-try.sh脚本。这个脚本会指定conf/deldger目录下的配置文件直接启动有三个broker服务的Dledger集群。每个集群指定的内存大小占用1G。 接下来我们可以在三台机器的conf/dledger目录下都创建一个broker.conf文件对每个broker服务进行配置。 worker1的broker.conf配置示例
brokerClusterName RaftCluster
brokerNameRaftNode00
listenPort30911
namesrvAddrworker1:9876;worker2:9876;worker3:9876
storePathRootDir/app/rocketmq/storeDledger/
storePathCommitLog/app/rocketmq/storeDledger/commitlog
storePathConsumeQueue/app/rocketmq/storeDledger/consumequeue
storePathIndex/app/rocketmq/storeDledger/index
storeCheckpoint/app/rocketmq/storeDledger/checkpoint
abortFile/app/rocketmq/storeDledger/abort
enableDLegerCommitLogtrue
dLegerGroupRaftNode00
dLegerPeersn0-worker1:40911;n1-worker2:40911;n2-worker3:40911
## must be unique
dLegerSelfIdn0
sendMessageThreadPoolNums16 worker2的broker.conf配置示例
brokerClusterName RaftCluster
brokerNameRaftNode00
listenPort30911
namesrvAddrworker1:9876;worker2:9876;worker3:9876
storePathRootDir/app/rocketmq/storeDledger/
storePathCommitLog/app/rocketmq/storeDledger/commitlog
storePathConsumeQueue/app/rocketmq/storeDledger/consumequeue
storePathIndex/app/rocketmq/storeDledger/index
storeCheckpoint/app/rocketmq/storeDledger/checkpoint
abortFile/app/rocketmq/storeDledger/abort
enableDLegerCommitLogtrue
dLegerGroupRaftNode00
dLegerPeersn0-worker1:40911;n1-worker2:40911;n2-worker3:40911
## must be unique
dLegerSelfIdn1
sendMessageThreadPoolNums16 worker3的broker.conf配置示例
brokerClusterName RaftCluster
brokerNameRaftNode00
listenPort30911
namesrvAddrworker1:9876;worker2:9876;worker3:9876
storePathRootDir/app/rocketmq/storeDledger/
storePathCommitLog/app/rocketmq/storeDledger/commitlog
storePathConsumeQueue/app/rocketmq/storeDledger/consumequeue
storePathIndex/app/rocketmq/storeDledger/index
storeCheckpoint/app/rocketmq/storeDledger/checkpoint
abortFile/app/rocketmq/storeDledger/abort
enableDLegerCommitLogtrue
dLegerGroupRaftNode00
dLegerPeersn0-worker1:40911;n1-worker2:40911;n2-worker3:40911
## must be unique
dLegerSelfIdn2
sendMessageThreadPoolNums16
这里对几个需要重点关注的配置项做下介绍
enableDLegerCommitLog: 是否启动Dledger。true表示启动namesrvAddr: 指定nameserver地址dLedgerGroup: Dledger Raft Group的名字建议跟brokerName保持一致。dLedgerPeers: Dledger Group内各个服务节点的地址及端口信息。同一个Group内的各个节点配置必须要保持一致。dLedgerSelfId: Dledger节点ID必须属于dLedgerPeers中的一个。同一个Group内的各个节点必须不能重复。sendMessageThreadPoolNumsdLedger内部发送消息的线程数建议配置成cpu核心数。store开头的一系列配置 这些是配置dLedger集群的消息存盘目录。如果你是从主从架构升级成为dLedger架构那么这个地址可以指向之前搭建住主从架构的地址。dLedger集群会兼容主从架构集群的消息格式只不过主从架构的消息无法享受dLedger集群的两阶段同步功能。 第三步启动broker服务
和启动主从架构的broker服务一样我们只需要在启动broker服务时指定配置文件即可。在三台服务器上分别执行以下指令启动broker服务。
cd /app/rocketmq/rocketmq-all-4.9.5-bin-release/
nohup bin/mqbroker -c conf/dledger/broker.conf 第四步检查集群服务状态
我们可以在Dashboard控制台的集群菜单页看到Dledger集群的运行状况。 从整个配置过程中可以看到我们并没有指定每个节点的角色而Dledger集群就自动将192.168.189.132也就是worker2上的broker服务选举成了master。 停止woker2上的broker服务mqshutdown broker命令再重新观察集群的运行状况。RocketMQ会在发现worker2服务宕机后很快的选举产生新的master节点。但具体选举出worker1还是worker3作为master则是随机的。这里选择了worker1 但是如果继续停止worker1上的broker服务那么集群中宕机的broker服务就超过了半数也就是两台。这时这个Dledger集群就选举不出master节点也就无法正常工作了。 关于Dledger集群的一些补充
Dledger集群机制是RocketMQ自4.5版本开始支持的一个重要特性。它其实是由OpenMessage组织带入RocketMQ的一个系列框架。它是一个为高可用、高性能、高可靠的分布式存储系统提供基础支持的组件。它做的事情主要有两个一是在集群中选举产生master节点。RocketMQ集群需要用这个master节点响应客户端的各种请求。二是在各种复杂的分布式场景下保证CommitLog日志文件在集群中的强一致性。 以下是ChatGPT对于Dledger的功能描述
RocketMQ是一款分布式消息队列系统主要用于处理大量数据的实时传输和处理。在RocketMQ中DLedger是一个为高可用、高性能、高可靠的分布式存储系统提供基础支持的组件。DLedger集群主要具有以下功能
数据复制DLedger集群通过raft协议来保证数据的一致性。在集群中每个节点都维护一个相同的数据副本以确保当某个节点出现故障时数据不会丢失。容错性DLedger集群具有很高的容错性。即使集群中的部分节点发生故障只要集群中有大多数节点即超过半数仍在正常工作整个集群将继续提供服务。高可用性DLedger集群通过负载均衡和热备份等机制确保在节点故障时能够快速切换到其他正常节点提高整个系统的可用性。分布式锁DLedger集群提供分布式锁功能可以解决分布式系统中的资源争用问题实现跨节点的资源同步。强一致性DLedger集群通过使用Raft一致性协议确保在多个副本节点之间同步数据保证数据的强一致性。高性能DLedger集群支持水平扩展可以通过增加节点来提高系统的吞吐量和存储能力以满足不断增长的业务需求。易于管理DLedger集群提供了一系列管理和监控功能使运维人员可以更方便地掌握系统的运行状况及时发现和解决问题。
总之RocketMQ的DLedger集群具有高可用、高性能、高可靠等特点为分布式消息队列系统提供了坚实的基础。 其背后的核心就是Raft协议。这是一种强大的分布式选举算法其核心是只要集群中超过半数的节点作出的共同决议就认为是集群最终的共同决议。
Raft协议通过投票机制保持数据一致性。详细的细节这里不做过多讨论只介绍一下Raft协议一个很强大的地方就是它解决了分布式集群中的脑裂问题。 关于脑裂问题这是在集群选举过程中一个出现概率不高但是让很多人头疼的问题。在分布式集群内有可能会由于网络波动或者其他一些不稳定因素造成集群内节点之间短时间通信不畅通。这时就容易在集群内形成多个包含多个节点的小集合。这些集合就会独立进行选举各自产生新的Master节点。当网络恢复畅通后集群中就有了多个Master节点。当集群中出现多个Master节点后其他节点就不知道要听从谁的指令了从而造成集群整体工作瘫痪。也就是俗话说的“一山不容二虎”。脑裂问题在以Zookeeper为代表的早前一代分布式一致性产品中是一个非常头疼的问题。而Raft协议对于脑裂问题会采用随机休眠的机制彻底解决脑裂问题。RocketMQ是Raft协议的一个重要的成功示例。Kafka也在之后基于Raft协议自行实现了Kraft集群机制。 ChatGPT对于脑裂问题的介绍
问题Dledger集群如何防止集群脑裂问题
DLedger集群通过使用Raft协议来防止集群脑裂split-brain问题。脑裂问题是指在分布式系统中由于网络分区或其他原因导致集群被分割成两个或多个子集群各自独立运行且无法感知到其他子集群的存在。这可能导致数据不一致和错误决策。Raft协议采用了一系列措施来避免脑裂问题的发生
选举机制Raft协议的基础是选举出一个领导者Leader其他节点Follower都从领导者获取数据。选举过程要求候选人必须获得集群中大多数节点的支持才能成为领导者。这确保了集群中只能有一个领导者从而避免了脑裂问题。任期TermRaft协议为每个选举周期设置了一个递增的任期编号。任期编号用于标识当前的领导者确保旧的领导者不会再次被选为领导者。如果一个节点发现自己的任期小于其他节点那么它会停止当前的工作并更新自己的任期。心跳机制领导者会定期向其他节点发送心跳消息以保持与Follower节点的连接。当一个节点长时间未收到领导者的心跳时它会认为当前领导者失效并启动新一轮选举。这确保了当领导者出现故障时系统能够快速地选出新的领导者。日志复制领导者负责将数据更新日志条目复制到其他节点。Follower节点只有在收到领导者的日志条目并将其写入本地日志后才会响应客户端的请求。这确保了在发生脑裂情况下不会出现多个节点试图同时修改同一份数据的情况。
通过以上措施DLedger集群利用Raft协议避免了脑裂问题的发生保证了系统的高可用性和数据一致性。 注Dledger集群由于会接管RocketMQ原生的文件写入所以Dledger集群的文件写入速度比RocketMQ的原生写入方式是要慢一点的。这会对RocketMQ的性能产生一些影响。所以当前版本的Dledger集群在企业中用得并不是太多。5.0版本对Dledger集群抽出了一种Dledger Controller模式也就是只用Dledger集群的选举功能而不用它的Commit文件写入功能这样性能可以得到一定的提升。