网站优化新闻,关键词挖掘工具爱网,最容易做流量的网站,国外家装设计网站大家好#xff0c;我是烤鸭#xff1a; 今天看下rocketmq。这篇主要是简单介绍下 rocketmq以及idea 本地调试 rocketmq。
项目架构
感兴趣的可以下载源码看下。
https://github.com/apache/rocketmq
项目结构图。
rocketmq-acl: acl 秘钥方式的鉴权#xff0c;用在bro…大家好我是烤鸭 今天看下rocketmq。这篇主要是简单介绍下 rocketmq以及idea 本地调试 rocketmq。
项目架构
感兴趣的可以下载源码看下。
https://github.com/apache/rocketmq
项目结构图。
rocketmq-acl: acl 秘钥方式的鉴权用在broker端。 rocketmq-broker整个mq的核心他能够接受producer和consumer的请求并调用store层服务对消息进行处理。HA服务的基本单元支持同步双写异步双写等模式。 rocketmq-clientmq客户端实现目前官方仅仅开源了java版本的mq客户端cgo客户端有社区开源贡献。 rocketmq-common一些模块间通用的功能类比如一些配置文件、常量。 rocketmq-distribution脚本、配置模块。 rocketmq-example官方提供的例子。 rocketmq-filtersrv消息过滤服务相当于在broker和consumer中间加入了一个filter代理。 rocketmq-logappender日志 rocketmq-logging日志 rocketmq-namesrvNameServer类似服务注册中心broker在这里注册consumer和producer在这里找到broker地址 rocketmq-openmessagingRocketMQ支持openmessaging详见https://rocketmq.apache.org/docs/openmessaging-example/ rocketmq-remoting基于netty的底层通信实现所有服务间的交互都基于此模块。 rocketmq-srvut解析命令行的工具类。 rocketmq-store存储层实现同时包括了索引服务高可用HA服务实现。 rocketmq-tools命令行工具提供了消息查询等功能。
下面重点说一下几个模块
注册中心 namesrv、broker、client 和 store先看一下关系。
看这个图是不是有点相似没错跟 dubbo 很像除了多了 broker。
nameserver 是注册中心用来记录broker信息、broker和topic关系。
producer 从nameserver 获取broker信息进行消息发送。
consumer 从nameserver 获取broker信息进行消息消费。
idea 导入源码本地调试
设置 rocketmq _home 目录后边的namesrv和broker会用到。新建conf目录并将 rocket-distribution 的conf里的broker.conf、logback_broker.xml、
logback_namesrv.xml、logback_tools.xml 复制到新建的conf目录中。我这里设置的目录是 E:\my\rocketmq 我这里修改了日志目录方便查看日志。
启动 NamesrvStartup
Connected to the target VM, address: 127.0.0.1:58819, transport: socket
Please set the ROCKETMQ_HOME variable in your environment to match the location of the RocketMQ installation
Disconnected from the target VM, address: 127.0.0.1:58819, transport: socket启动参数配置 rocketmq_home 的环境变量ROCKETMQ_HOMEE:\my\rocketmq 启动成功
Connected to the target VM, address: 127.0.0.1:50261, transport: socket
The Name Server boot success. serializeTypeJSON会发现 rocketmq_home 目录下生成了 logs/rocketmqlogs 目录存放的是日志文件。
启动broker
设置启动参数和 rocketmq_home 的环境变量
autoCreateTopicEnabletrue 是为了测试的时候可以发送时创建topic默认是 false(不建议开启避免并发发送时topic重复问题)
-c E:\my\rocketmq\conf\broker.conf -n localhost:9876 autoCreateTopicEnabletrueROCKETMQ_HOMEE:\my\rocketmq会发现 rocketmq_home 目录下生成了 store 目录存放的是broker维护的信息像消费者的偏移量、延迟队列的偏移量、topic。
启动consumer
rocketmq-example 项目下example\src\main\java\org\apache\rocketmq\example\quickstart\Consumer.java
指定broker地址
consumer.setNamesrvAddr(localhost:9876);启动producer并发送消息
rocketmq-example 项目下example\src\main\java\org\apache\rocketmq\example\quickstart\Producer.java
指定broker地址修改循环次数为2次
producer.setNamesrvAddr(localhost:9876);发送成功
SendResult [sendStatusSEND_OK, msgId7F000001395C18B4AAC22C7A99940000, offsetMsgId0AA80D1200002A9F0000000000000000, messageQueueMessageQueue [topicTopicTest, brokerNamebroker-a, queueId3], queueOffset0]
SendResult [sendStatusSEND_OK, msgId7F000001395C18B4AAC22C7A99D80001, offsetMsgId0AA80D1200002A9F00000000000000C9, messageQueueMessageQueue [topicTopicTest, brokerNamebroker-a, queueId0], queueOffset0]消费端接收成功
ConsumeMessageThread_1 Receive_1 New Messages: [MessageExt [brokerNamebroker-a, queueId3, storeSize201, queueOffset0, sysFlag0, bornTimestamp1625815032213, bornHost/10.168.13.18:57729, storeTimestamp1625815032241, storeHost/10.168.13.18:10911, msgId0AA80D1200002A9F0000000000000000, commitLogOffset0, bodyCRC613185359, reconsumeTimes0, preparedTransactionOffset0, toString()Message{topicTopicTest, flag0, properties{MIN_OFFSET0, MAX_OFFSET1, CONSUME_START_TIME1625815055025, UNIQ_KEY7F000001395C18B4AAC22C7A99940000, CLUSTERDefaultCluster, WAITtrue, TAGSTagA}, body[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionIdnull}]]
ConsumeMessageThread_2 Receive_1 New Messages: [MessageExt [brokerNamebroker-a, queueId0, storeSize201, queueOffset0, sysFlag0, bornTimestamp1625815032280, bornHost/10.168.13.18:57729, storeTimestamp1625815032282, storeHost/10.168.13.18:10911, msgId0AA80D1200002A9F00000000000000C9, commitLogOffset201, bodyCRC1401636825, reconsumeTimes0, preparedTransactionOffset0, toString()Message{topicTopicTest, flag0, properties{MIN_OFFSET0, MAX_OFFSET1, CONSUME_START_TIME1625815056025, UNIQ_KEY7F000001395C18B4AAC22C7A99D80001, CLUSTERDefaultCluster, WAITtrue, TAGSTagA}, body[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionIdnull}]] 当发送的时候 store目录下会生成 commitLog 目录(消息内容)和consumequeue目录(存的是topic和queueId)
commitLog目录 默认上来生成两个文件2个G。 consumequeue目录一级子目录是topic二级子目录是queueId mq 控制台
rocketmq-console 在另一个仓库地址
https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
启动成功画面还是比较清新的 模拟发送100条可以看到每个broker的数量 console功能还是有很多可以再开发的地方官方基本不维护了需要的可能得二次开发了。
像我们就开发了类似报表、报警、监控等一些功能还是比较方便的。