企业网站建设需要准备资料,php网站链接支付宝,wordpress 文档阅读插件,泉州建设网站RocketMQ单机支持1万以上的持久化队列#xff0c;前提是足够的内存、硬盘空间#xff0c;过期数据数据删除#xff08;RocketMQ中的消息队列长度不是无限的#xff0c;只是足够大的内存数据定时删除#xff09; RocketMQ版本#xff1a;3.1.4 一#xff0c;部署NameServ… RocketMQ单机支持1万以上的持久化队列前提是足够的内存、硬盘空间过期数据数据删除RocketMQ中的消息队列长度不是无限的只是足够大的内存数据定时删除 RocketMQ版本3.1.4 一部署NameServer 1安装JDK并设置JAVA_HOME环境变量启动脚本依赖JAVA_HOME环境变量 2cd /alibaba-rocketmq/bin进入RocketMQ的bin目录 2调用nohup sh mqnamesrv 启动NameServer 报错如下 [plain] view plain copy : command not found : command not found mqnamesrv: line 35: syntax error: unexpected end of file 在bin目录下调用dos2unix *将所有文件转化为unix格式再次调用nohup sh mqnamesrv 报错如下 [plain] view plain copy /home/hadoop/alibaba-rocketmq Invalid initial heap size: -Xms4g The specified size exceeds the maximum representable size. Could not create the Java virtual machine. 由于安装的JDK版本为32位4g超过了JDK所支持的最大内存不过32位JDK也无法发挥出RocketMQ的优势换成64位JDK 这次启动成功 [plain] view plain copy [hadoophadoop bin]$ nohup sh mqnamesrv [1] 17676 [hadoophadoop bin]$ nohup: appending output to “nohup.out” [hadoophadoop bin]$ cat nohup.out The Name Server boot success. [hadoophadoop bin]$ jps 17682 NamesrvStartup 17800 Jps NameServer监听端口9876 [java] view plain copy nettyServerConfig.setListenPort(9876); 如果服务器内存不够可以修改runserver.sh脚本mqnamesrv文件中通过runserver.sh脚本调用Name Server的主函数com.alibaba.rocketmq.namesrv.NamesrvStartup启动Name Server中的JAVA_OPT_1参数 [plain] view plain copy JAVA_OPT_1-server -Xms4g -Xmx4g -Xmn2g -XX:PermSize128m -XX:MaxPermSize320m 二部署Broker消息中转角色负责存储消息转发消息 Broker集群有多种配置方式 1单Master 优点除了配置简单没什么优点 缺点不可靠该机器重启或宕机将导致整个服务不可用 2多Master 优点配置简单性能最高 缺点可能会有少量消息丢失配置相关单台机器重启或宕机期间该机器下未被消费的消息在机器恢复前不可订阅影响消息实时性 3多Master多Slave每个Master配一个Slave有多对Master-Slave集群采用异步复制方式主备有短暂消息延迟毫秒级 优点性能同多Master几乎一样实时性高主备间切换对应用透明不需人工干预 缺点Master宕机或磁盘损坏时会有少量消息丢失 4多Master多Slave每个Master配一个Slave有多对Master-Slave集群采用同步双写方式主备都写成功向应用返回成功 优点服务可用性与数据可用性非常高 缺点性能比异步集群略低当前版本主宕备不能自动切换为主 Master和Slave的配置文件参考conf目录下的配置文件 Master与Slave通过指定相同的brokerName参数来配对Master的BrokerId必须是0Slave的BrokerId必须是大于0的数 一个Master下面可以挂载多个Slave同一Master下的多个Slave通过指定不同的BrokerId来区分 部署一Master一Slave集群采用异步复制方式 Master [plain] view plain copy [hadoophadoop bin]$ nohup sh mqbroker -n 192.168.58.163:9876 -c ../conf/2m-2s-async/broker-a.properties [2] 25493 [hadoophadoop bin]$ nohup: appending output to “nohup.out” [hadoophadoop bin]$ cat nohup.out load config properties file OK, ../conf/2m-2s-async/broker-a.properties The broker[broker-a, 192.168.58.163:10911] boot success. and name server is 192.168.58.163:9876 [hadoophadoop bin]$ jps 25500 BrokerStartup 25545 Jps 17682 NamesrvStartup Slave [plain] view plain copy [hadoophadoop bin]$ nohup sh mqbroker -n 192.168.58.163:9876 -c ../conf/2m-2s-async/broker-a-s.properties [1] 1974 [hadoophadoop bin]$ nohup: appending output to “nohup.out” [hadoophadoop bin]$ cat nohup.out load config properties file OK, ../conf/2m-2s-async/broker-a-s.properties The broker[broker-a, 192.168.58.164:10911] boot success. and name server is 192.168.58.163:9876 [hadoophadoop bin]$ jps 2071 Jps 1981 BrokerStartup Broker监听端口10911 [java] view plain copy nettyServerConfig.setListenPort(10911); 如果服务器内存不够可以修改runbroker.sh脚本mqbroker文件中通过runbroker.sh脚本调用Broker的主函数com.alibaba.rocketmq.broker.BrokerStartup启动Broker的JAVA_OPT_1参数 [plain] view plain copy JAVA_OPT_1-server -Xms4g -Xmx4g -Xmn2g -XX:PermSize128m -XX:MaxPermSize320m 三Producer 必须要设置Name Server地址 [java] view plain copy package com.sean; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; public class Producer { public static void main(String[] args){ DefaultMQProducer producer new DefaultMQProducer(Producer); producer.setNamesrvAddr(192.168.58.163:9876); try { producer.start(); Message msg new Message(PushTopic, push, 1, Just for test..getBytes()); SendResult result producer.send(msg); System.out.println(id: result.getMsgId() result: result.getSendStatus()); msg new Message(PushTopic, push, 2, Just for test..getBytes()); result producer.send(msg); System.out.println(id: result.getMsgId() result: result.getSendStatus()); msg new Message(PullTopic, pull, 1, Just for test..getBytes()); result producer.send(msg); System.out.println(id: result.getMsgId() result: result.getSendStatus()); } catch (Exception e) { e.printStackTrace(); }finally{ producer.shutdown(); } } } 四Consumer 必须要设置Name Server地址 [java] view plain copy package com.sean; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args){ DefaultMQPushConsumer consumer new DefaultMQPushConsumer(PushConsumer); consumer.setNamesrvAddr(192.168.58.163:9876); try { //订阅PushTopic下Tag为push的消息 consumer.subscribe(PushTopic, push); //程序第一次启动从消息队列头取数据 consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener( new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( ListMessageExt list, ConsumeConcurrentlyContext Context) { Message msg list.get(0); System.out.println(msg.toString()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } ); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } } 先运行Consumer然后运行Producer Producer运行结果 [plain] view plain copy id:C0A83AA300002A9F00000000000009EA result:SEND_OK id:C0A83AA300002A9F0000000000000A77 result:SEND_OK id:C0A83AA300002A9F0000000000000B04 result:SEND_OK Consumer运行结果 MessageExt [queueId1, storeSize141, queueOffset6, sysFlag0, bornTimestamp1403765668792, bornHost/192.168.31.130:60985, storeTimestamp1403765527374, storeHost/192.168.58.163:10911, msgIdC0A83AA300002A9F0000000000000A77, commitLogOffset2679, bodyCRC753746584, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicPushTopic, flag0, properties{TAGSpush, KEYS2, WAITtrue, MAX_OFFSET7, MIN_OFFSET0}, body14]]
MessageExt [queueId0, storeSize141, queueOffset6, sysFlag0, bornTimestamp1403765668698, bornHost/192.168.31.130:60985, storeTimestamp1403765527356, storeHost/192.168.58.163:10911, msgIdC0A83AA300002A9F00000000000009EA, commitLogOffset2538, bodyCRC753746584, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicPushTopic, flag0, properties{TAGSpush, KEYS1, WAITtrue, MAX_OFFSET7, MIN_OFFSET0}, body14]]