山东济南网站开发,郑州网站网络推广公司,在网上怎样卖货,优化一个网站文章目录 前言一、单台集群部署二、多台集群部署1.修改配置2.dashboard修改 三、整合springboot1.引入pom和修改yml2.编写消费者3.编写生产者4.测试效果 总结 前言 RocketMQ集群方式有好几种 官网地址 https://rocketmq.apache.org/zh/docs/4.x/deployment/01deploy
2m-2s-asy… 文章目录 前言一、单台集群部署二、多台集群部署1.修改配置2.dashboard修改 三、整合springboot1.引入pom和修改yml2.编写消费者3.编写生产者4.测试效果 总结 前言 RocketMQ集群方式有好几种 官网地址 https://rocketmq.apache.org/zh/docs/4.x/deployment/01deploy
2m-2s-async2主2从异步刷盘(吞吐量较大但是消息可能丢失2m-2s-sync2主2从同步刷盘(吞吐量会下降但是消息更安全)2m-noslave 2主无从(单点故障)然后还可以直接配置broker.conf进行单点环境配置dledger用来实现主从切换的。集群中的节点会基于Raft协议随机选举出一个leader 其他的就都是follower。通常正式环境都会采用这种方式来搭建集群。 dledger搭建参考文档 https://rocketmq.apache.org/zh/docs/4.x/bestPractice/02dledger
MQ安装部署请看这篇https://blog.csdn.net/HBliucheng/article/details/135357998
搭建过程中踩过的坑也也会记录下来
一、单台集群部署
## 启动
nohup sh bin/dledger/fast-try.sh start
## 关闭
nohup sh bin/dledger/fast-try.sh stop先启动 fast-try.sh start 启动时发现权限不足 nohup: 无法运行命令bin/mqbroker: 权限不够 查看启动脚本
cat bin/dledger/fast-try.sh那我们就修改下nohup 后面加上sh 修改后如下
function startNameserver() {export JAVA_OPT_EXT -Xms512m -Xmx512m nohup sh bin/mqnamesrv
}function startBroker() {export JAVA_OPT_EXT -Xms1g -Xmx1g conf_name$1nohup sh bin/mqbroker -c $conf_name
}
再次启动发现可以了 执行命令 查看集群情况 BID 0的是主节点
sh bin/mqadmin clusterList -n 127.0.0.1:9876再看看dashboarb 启动之前请先开放6个端口 如果还有端口访问不了的请自行开放出来
firewall-cmd --zonepublic --add-port30909/tcp --permanent
firewall-cmd --zonepublic --add-port30911/tcp --permanent
firewall-cmd --zonepublic --add-port30919/tcp --permanent
firewall-cmd --zonepublic --add-port30921/tcp --permanent
firewall-cmd --zonepublic --add-port30929/tcp --permanent
firewall-cmd --zonepublic --add-port30931/tcp --permanent### 如果不想一次次开放下面命令也可以
firewall-cmd --zonepublic --add-port30900-30930/tcp --permanent
## 重启防火墙
systemctl reload firewalld
## 查看开放的端口
firewall-cmd --list-ports
## 其它命令
### 关闭端口
firewall-cmd --zonepublic --remove-port30909/tcp --permanent 启动生产者和消费者再看 master消费一个 停止master lsof -i:30911## 找到pid杀死 我的是118276kill 118276我们再启动 被杀死的broker
nohup sh bin/mqbroker -c conf/dledger/broker-n0.conf 发现30911作为slave回来了
二、多台集群部署
先准备三台机器 192.168.141.101 192.168.141.102 192.168.141.103
1.修改配置
192.168.141.101修改如下 profile不修改也可以
vim /etc/profile
## 加入192.168.141.102 192.168.141.103 同理102 103也改成这样
export NAMESRV_ADDR192.168.141.101:9876
source /etc/profile修改 broker.conf 后面我们启动哪个就修改哪个 我是把 broker-n0.conf复制一份到broker.conf也可以直接修改broker-n0.conf启动时启动自己配置的conf文件就可以
cd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0/conf/dledger
cp broker-n0.conf broker.conf
vim broker.conf
## 修改的配置如下改动地方namesrvAddr dLegerPeers dLegerSelfId
brokerClusterName RaftCluster
brokerNameRaftNode00
listenPort30911
namesrvAddr192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876
storePathRootDir/tmp/rmqstore/node00
storePathCommitLog/tmp/rmqstore/node00/commitlog
enableDLegerCommitLogtrue
dLegerGroupRaftNode00
dLegerPeersn0-192.168.141.101:40911;n1-192.168.141.102:40911;n2-192.168.141.103:40911
## must be unique
dLegerSelfIdn0
sendMessageThreadPoolNums16
192.168.141.102修改如下 profile不修改也可以
vim /etc/profile
## 加入192.168.141.102 192.168.141.103 同理102 103也改成这样
export NAMESRV_ADDR92.168.141.102:9876
source /etc/profilecd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0/conf/dledger
cp broker-n0.conf broker.conf
vim broker.conf
## 修改的配置如下改动地方namesrvAddr dLegerPeers dLegerSelfId
brokerClusterName RaftCluster
brokerNameRaftNode00
listenPort30911
namesrvAddr192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876
storePathRootDir/tmp/rmqstore/node00
storePathCommitLog/tmp/rmqstore/node00/commitlog
enableDLegerCommitLogtrue
dLegerGroupRaftNode00
dLegerPeersn0-192.168.141.101:40911;n1-192.168.141.102:40911;n2-192.168.141.103:40911
## must be unique
dLegerSelfIdn1
sendMessageThreadPoolNums16
192.168.141.103修改如下 profile不修改也可以
vim /etc/profile
## 加入192.168.141.102 192.168.141.103 同理102 103也改成这样
export NAMESRV_ADDR192.168.141.103:9876cd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0/conf/dledger
cp broker-n0.conf broker.conf
vim broker.conf
## 修改的配置如下改动地方namesrvAddr dLegerPeers dLegerSelfId
brokerClusterName RaftCluster
brokerNameRaftNode00
listenPort30911
namesrvAddr192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876
storePathRootDir/tmp/rmqstore/node00
storePathCommitLog/tmp/rmqstore/node00/commitlog
enableDLegerCommitLogtrue
dLegerGroupRaftNode00
dLegerPeersn0-192.168.141.101:40911;n1-192.168.141.102:40911;n2-192.168.141.103:40911
## must be unique
dLegerSelfIdn2
sendMessageThreadPoolNums16
开放端口 每台机器都要开放
firewall-cmd --zonepublic --add-port30911/tcp --permanent
firewall-cmd --zonepublic --add-port40911/tcp --permanent
systemctl reload firewalld如果还有端口没开放请自行开放
启动 每台机器都要启动
cd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0
nohup sh bin/mqnamesrv
nohup sh bin/mqbroker -c conf/dledger/broker.conf 查看日志 发现未创建文件夹创建文件夹 mkdir -p /tmp/rmqstore/node00/commitlog## 关掉再启动
sh bin/mqshutdown broker
## 启动broker
nohup sh bin/mqbroker -c conf/dledger/broker.conf 查看集群情况
sh bin/mqadmin clusterList -n 127.0.0.1:9876踩坑 这个值不要随便写这里从0开始递增 不然选举会有问题
2.dashboard修改
修改配置
## 根据自己的服务器地址修改注意中间是分号不是逗号
rocketmq.config.namesrvAddr192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876启动访问 关闭master再查看集群情况然后再重启和前面的单机集群一样的大家可自行测试
三、整合springboot
1.引入pom和修改yml dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.2.2/version/dependency
rocketmq:
# 集群中间以分号隔开name-server: 192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876producer:group: my_group_test
2.编写消费者
package com.study.config.rocketmq;import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import java.nio.charset.Charset;/*** author: * time: 2024/1/5 10:00*/
Component
RocketMQMessageListener(consumerGroup my_group_test,topic topic_test,selectorType SelectorType.TAG,selectorExpression tagA)
Slf4j
public class MQMsgListener implements RocketMQListenerMessageExt {Overridepublic void onMessage(MessageExt message) {String msgId message.getMsgId();String msg new String(message.getBody(), CharsetUtil.UTF_8);log.info(msgId{} msg{},msgId,msg);}
}
RocketMQMessageListener 注解参数如下
topic: 消费者订阅的主题即消费者将从这个主题中接收消息。consumerGroup: 消费者组多个消费者可以组成一个消费者组共同从一个主题中接收消息。consumeMode: 消费模式指定消费者是以并发的方式接收消息还是以有序的方式接收消息。并发模式下多个消费者可以同时接收消息有序模式下每个消费者按照消息的顺序依次接收消息。messageModel: 消息模式指定消息是以集群模式还是广播模式发送。集群模式下消息将被发送到同一个主题的其中一个消费者广播模式下消息将被发送到主题的所有消费者。selectorType: 过滤消息的方式可以使用标签(Tag)或SQL92表达式(SQL92)来过滤消息。selectorExpression: 过滤消息的表达式可以使用标签(Tag)或SQL92表达式(SQL92)来指定过滤条件。maxReconsumeTimes: 消息消费失败后可被重复投递的最大次数。超过最大重试次数后消息将被放入死信队列。delayLevelWhenNextConsume: 并发模式的消息重试策略指定消息消费失败后的重试延迟级别。设置为-1时表示无需重试直接将消息放入死信队列。
3.编写生产者
package com.study.controller;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.*;import javax.annotation.Resource;/*** author: * time: 2024/1/5 10:17*/
RestController
RequestMapping(/mq)
Slf4j
public class RocketMQProducerController {ResourceRocketMQTemplate rocketMQTemplate;PostMapping(/sendMessage)ResponseBodypublic void sendMessage(String msg){rocketMQTemplate.asyncSend(topic_test, hello mq, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {log.info(msgId{},sendResult.getMsgId());}Overridepublic void onException(Throwable e) {e.printStackTrace();}});}
}
同步会有一点小问题第一次启动不会消费直接写成异步
4.测试效果
发现没有主题 追踪源码发现主题和过滤消息的表达式按照冒号分割 topic取第一位过滤表达式取第二位
修改再试下 rocketMQTemplate.asyncSend(topic_test:tagA, hello mq, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {log.info(msgId{},sendResult.getMsgId());}Overridepublic void onException(Throwable e) {e.printStackTrace();}});发现可以了 前面写了个java客户端的消费者改下消费组发现也可以消费
java客户端代码
package com.bsoft;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;/*** author: liucheng* time: 2023/12/29 15:39*/
public class MQConsumer {private final static String nameServer 192.168.141.101:9876;private final static String consumerGroup my_group_test02;private final static String topic topic_test;public static void main(String[] args) throws MQClientException, IOException, InterruptedException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(consumerGroup);// 设置NameServer的地址consumer.setNamesrvAddr(nameServer);// 订阅一个或者多个Topic以及Tag来过滤需要消费的消息consumer.subscribe(topic, tagA);// 注册回调实现类来处理从broker拉取回来的消息consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), msgs);msgs.forEach((msg)-{byte[] body msg.getBody();String s new String(body, Charset.defaultCharset());System.out.println(msg s);});// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();System.out.printf(Consumer Started......);
// Thread.sleep(5000);
// consumer.shutdown();System.in.read();}
} 到此集群搭建完成大家搭建过程中有遇到问题可以交流
总结
整个搭建过程不难就是有点繁琐需要配置多台服务器 其中配置brocker.conf时dLegerSelfId值这块要注意 ,dLegerSelfId是节点 id, 必须属于 dLegerPeers 中的一个同 Group 内各个节点要唯一。这个值从0开始递增 同一台服务器上启动时先启动 namesrv 再启动 broker