当前位置: 首页 > news >正文

山东济南网站开发郑州网站网络推广公司

山东济南网站开发,郑州网站网络推广公司,在网上怎样卖货,优化一个网站文章目录 前言一、单台集群部署二、多台集群部署1.修改配置2.dashboard修改 三、整合springboot1.引入pom和修改yml2.编写消费者3.编写生产者4.测试效果 总结 前言 RocketMQ集群方式有好几种 官网地址 https://rocketmq.apache.org/zh/docs/4.x/deployment/01deploy 2m-2s-asy… 文章目录 前言一、单台集群部署二、多台集群部署1.修改配置2.dashboard修改 三、整合springboot1.引入pom和修改yml2.编写消费者3.编写生产者4.测试效果 总结 前言 RocketMQ集群方式有好几种 官网地址 https://rocketmq.apache.org/zh/docs/4.x/deployment/01deploy 2m-2s-async2主2从异步刷盘(吞吐量较大但是消息可能丢失2m-2s-sync2主2从同步刷盘(吞吐量会下降但是消息更安全)2m-noslave 2主无从(单点故障)然后还可以直接配置broker.conf进行单点环境配置dledger用来实现主从切换的。集群中的节点会基于Raft协议随机选举出一个leader 其他的就都是follower。通常正式环境都会采用这种方式来搭建集群。 dledger搭建参考文档 https://rocketmq.apache.org/zh/docs/4.x/bestPractice/02dledger MQ安装部署请看这篇https://blog.csdn.net/HBliucheng/article/details/135357998 搭建过程中踩过的坑也也会记录下来 一、单台集群部署 ## 启动 nohup sh bin/dledger/fast-try.sh start ## 关闭 nohup sh bin/dledger/fast-try.sh stop先启动 fast-try.sh start 启动时发现权限不足 nohup: 无法运行命令bin/mqbroker: 权限不够 查看启动脚本 cat bin/dledger/fast-try.sh那我们就修改下nohup 后面加上sh 修改后如下 function startNameserver() {export JAVA_OPT_EXT -Xms512m -Xmx512m nohup sh bin/mqnamesrv }function startBroker() {export JAVA_OPT_EXT -Xms1g -Xmx1g conf_name$1nohup sh bin/mqbroker -c $conf_name } 再次启动发现可以了 执行命令 查看集群情况 BID 0的是主节点 sh bin/mqadmin clusterList -n 127.0.0.1:9876再看看dashboarb 启动之前请先开放6个端口 如果还有端口访问不了的请自行开放出来 firewall-cmd --zonepublic --add-port30909/tcp --permanent firewall-cmd --zonepublic --add-port30911/tcp --permanent firewall-cmd --zonepublic --add-port30919/tcp --permanent firewall-cmd --zonepublic --add-port30921/tcp --permanent firewall-cmd --zonepublic --add-port30929/tcp --permanent firewall-cmd --zonepublic --add-port30931/tcp --permanent### 如果不想一次次开放下面命令也可以 firewall-cmd --zonepublic --add-port30900-30930/tcp --permanent ## 重启防火墙 systemctl reload firewalld ## 查看开放的端口 firewall-cmd --list-ports ## 其它命令 ### 关闭端口 firewall-cmd --zonepublic --remove-port30909/tcp --permanent 启动生产者和消费者再看 master消费一个 停止master lsof -i:30911## 找到pid杀死 我的是118276kill 118276我们再启动 被杀死的broker nohup sh bin/mqbroker -c conf/dledger/broker-n0.conf 发现30911作为slave回来了 二、多台集群部署 先准备三台机器 192.168.141.101 192.168.141.102 192.168.141.103 1.修改配置 192.168.141.101修改如下 profile不修改也可以 vim /etc/profile ## 加入192.168.141.102 192.168.141.103 同理102 103也改成这样 export NAMESRV_ADDR192.168.141.101:9876 source /etc/profile修改 broker.conf 后面我们启动哪个就修改哪个 我是把 broker-n0.conf复制一份到broker.conf也可以直接修改broker-n0.conf启动时启动自己配置的conf文件就可以 cd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0/conf/dledger cp broker-n0.conf broker.conf vim broker.conf ## 修改的配置如下改动地方namesrvAddr dLegerPeers dLegerSelfId brokerClusterName RaftCluster brokerNameRaftNode00 listenPort30911 namesrvAddr192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876 storePathRootDir/tmp/rmqstore/node00 storePathCommitLog/tmp/rmqstore/node00/commitlog enableDLegerCommitLogtrue dLegerGroupRaftNode00 dLegerPeersn0-192.168.141.101:40911;n1-192.168.141.102:40911;n2-192.168.141.103:40911 ## must be unique dLegerSelfIdn0 sendMessageThreadPoolNums16 192.168.141.102修改如下 profile不修改也可以 vim /etc/profile ## 加入192.168.141.102 192.168.141.103 同理102 103也改成这样 export NAMESRV_ADDR92.168.141.102:9876 source /etc/profilecd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0/conf/dledger cp broker-n0.conf broker.conf vim broker.conf ## 修改的配置如下改动地方namesrvAddr dLegerPeers dLegerSelfId brokerClusterName RaftCluster brokerNameRaftNode00 listenPort30911 namesrvAddr192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876 storePathRootDir/tmp/rmqstore/node00 storePathCommitLog/tmp/rmqstore/node00/commitlog enableDLegerCommitLogtrue dLegerGroupRaftNode00 dLegerPeersn0-192.168.141.101:40911;n1-192.168.141.102:40911;n2-192.168.141.103:40911 ## must be unique dLegerSelfIdn1 sendMessageThreadPoolNums16 192.168.141.103修改如下 profile不修改也可以 vim /etc/profile ## 加入192.168.141.102 192.168.141.103 同理102 103也改成这样 export NAMESRV_ADDR192.168.141.103:9876cd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0/conf/dledger cp broker-n0.conf broker.conf vim broker.conf ## 修改的配置如下改动地方namesrvAddr dLegerPeers dLegerSelfId brokerClusterName RaftCluster brokerNameRaftNode00 listenPort30911 namesrvAddr192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876 storePathRootDir/tmp/rmqstore/node00 storePathCommitLog/tmp/rmqstore/node00/commitlog enableDLegerCommitLogtrue dLegerGroupRaftNode00 dLegerPeersn0-192.168.141.101:40911;n1-192.168.141.102:40911;n2-192.168.141.103:40911 ## must be unique dLegerSelfIdn2 sendMessageThreadPoolNums16 开放端口 每台机器都要开放 firewall-cmd --zonepublic --add-port30911/tcp --permanent firewall-cmd --zonepublic --add-port40911/tcp --permanent systemctl reload firewalld如果还有端口没开放请自行开放 启动 每台机器都要启动 cd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0 nohup sh bin/mqnamesrv nohup sh bin/mqbroker -c conf/dledger/broker.conf 查看日志 发现未创建文件夹创建文件夹 mkdir -p /tmp/rmqstore/node00/commitlog## 关掉再启动 sh bin/mqshutdown broker ## 启动broker nohup sh bin/mqbroker -c conf/dledger/broker.conf 查看集群情况 sh bin/mqadmin clusterList -n 127.0.0.1:9876踩坑 这个值不要随便写这里从0开始递增 不然选举会有问题 2.dashboard修改 修改配置 ## 根据自己的服务器地址修改注意中间是分号不是逗号 rocketmq.config.namesrvAddr192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876启动访问 关闭master再查看集群情况然后再重启和前面的单机集群一样的大家可自行测试 三、整合springboot 1.引入pom和修改yml dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.2.2/version/dependency rocketmq: # 集群中间以分号隔开name-server: 192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876producer:group: my_group_test 2.编写消费者 package com.study.config.rocketmq;import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.SelectorType; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component;import java.nio.charset.Charset;/*** author: * time: 2024/1/5 10:00*/ Component RocketMQMessageListener(consumerGroup my_group_test,topic topic_test,selectorType SelectorType.TAG,selectorExpression tagA) Slf4j public class MQMsgListener implements RocketMQListenerMessageExt {Overridepublic void onMessage(MessageExt message) {String msgId message.getMsgId();String msg new String(message.getBody(), CharsetUtil.UTF_8);log.info(msgId{} msg{},msgId,msg);} } RocketMQMessageListener 注解参数如下 topic: 消费者订阅的主题即消费者将从这个主题中接收消息。consumerGroup: 消费者组多个消费者可以组成一个消费者组共同从一个主题中接收消息。consumeMode: 消费模式指定消费者是以并发的方式接收消息还是以有序的方式接收消息。并发模式下多个消费者可以同时接收消息有序模式下每个消费者按照消息的顺序依次接收消息。messageModel: 消息模式指定消息是以集群模式还是广播模式发送。集群模式下消息将被发送到同一个主题的其中一个消费者广播模式下消息将被发送到主题的所有消费者。selectorType: 过滤消息的方式可以使用标签(Tag)或SQL92表达式(SQL92)来过滤消息。selectorExpression: 过滤消息的表达式可以使用标签(Tag)或SQL92表达式(SQL92)来指定过滤条件。maxReconsumeTimes: 消息消费失败后可被重复投递的最大次数。超过最大重试次数后消息将被放入死信队列。delayLevelWhenNextConsume: 并发模式的消息重试策略指定消息消费失败后的重试延迟级别。设置为-1时表示无需重试直接将消息放入死信队列。 3.编写生产者 package com.study.controller;import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.web.bind.annotation.*;import javax.annotation.Resource;/*** author: * time: 2024/1/5 10:17*/ RestController RequestMapping(/mq) Slf4j public class RocketMQProducerController {ResourceRocketMQTemplate rocketMQTemplate;PostMapping(/sendMessage)ResponseBodypublic void sendMessage(String msg){rocketMQTemplate.asyncSend(topic_test, hello mq, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {log.info(msgId{},sendResult.getMsgId());}Overridepublic void onException(Throwable e) {e.printStackTrace();}});} } 同步会有一点小问题第一次启动不会消费直接写成异步 4.测试效果 发现没有主题 追踪源码发现主题和过滤消息的表达式按照冒号分割 topic取第一位过滤表达式取第二位 修改再试下 rocketMQTemplate.asyncSend(topic_test:tagA, hello mq, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {log.info(msgId{},sendResult.getMsgId());}Overridepublic void onException(Throwable e) {e.printStackTrace();}});发现可以了 前面写了个java客户端的消费者改下消费组发现也可以消费 java客户端代码 package com.bsoft;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt;import java.io.IOException; import java.nio.charset.Charset; import java.util.List;/*** author: liucheng* time: 2023/12/29 15:39*/ public class MQConsumer {private final static String nameServer 192.168.141.101:9876;private final static String consumerGroup my_group_test02;private final static String topic topic_test;public static void main(String[] args) throws MQClientException, IOException, InterruptedException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(consumerGroup);// 设置NameServer的地址consumer.setNamesrvAddr(nameServer);// 订阅一个或者多个Topic以及Tag来过滤需要消费的消息consumer.subscribe(topic, tagA);// 注册回调实现类来处理从broker拉取回来的消息consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), msgs);msgs.forEach((msg)-{byte[] body msg.getBody();String s new String(body, Charset.defaultCharset());System.out.println(msg s);});// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();System.out.printf(Consumer Started......); // Thread.sleep(5000); // consumer.shutdown();System.in.read();} } 到此集群搭建完成大家搭建过程中有遇到问题可以交流 总结 整个搭建过程不难就是有点繁琐需要配置多台服务器 其中配置brocker.conf时dLegerSelfId值这块要注意 ,dLegerSelfId是节点 id, 必须属于 dLegerPeers 中的一个同 Group 内各个节点要唯一。这个值从0开始递增 同一台服务器上启动时先启动 namesrv 再启动 broker
http://www.zqtcl.cn/news/111250/

相关文章:

  • 石家庄门户网站建设免费简历模板的网站
  • 微网站建设市场如何做好平台推广
  • 网站不备案做优化小程序开发前景怎么样
  • 美丽说网站优化百度关键词优化
  • 同性男做的视频网站赶集网招聘最新招聘附近找工作
  • 做挖机配件销售的网站oa办公系统软件哪家好
  • 聊城设计网站商务网站的特点
  • 厦门做个网站多少钱工程建设范围
  • 百度推广官方网站在哪里制作网页
  • 济南集团网站建设方案沈阳手机网站制作
  • 网站备案号注销的结果做网站的外包能学到什么
  • 在线购物网站开发项目网站建设电话推广话术
  • 网站主体信息太原站扩建
  • 西平县住房和城乡建设局网站空间商网站
  • p2p网站建设cms一键生成图片
  • 甘肃省第八建设集团公司网站能够做物理题的网站
  • 团购网站建设方案建筑工程网校官网
  • 佛山建站网站模板小公司管理方法
  • 常德住房和城乡建设局网站做风险代理案源的网站
  • 手机网站开发人员选项wordpress加载媒体库
  • 做钓鱼网站用哪种编程语言张家界有实力seo优化费用
  • 如何做一个主题网站做网站必须有框架么
  • 建设网站需要什么知识上海高端网页设计
  • 电子商务网站建设基本流程公司网站建设平台
  • 域名没过期 网站打不开怎么办素马设计顾问讲解价格
  • 怎么做非法彩票网站贵州网站开发哪家便宜
  • 青岛市医疗保险网站wordpress七牛云
  • 哪个浏览器可以做网站查询网站的外链
  • 浅析社区网站的建设有了网站源码 怎么建设网站
  • 苏州网站排名优化系统网页设计师