RocketMQ使用指南1——简介和集群部署
目录
RocketMQ简介和集群架构与原理(本篇)
理解RocketMQ的消息模型并与Spring集成
RocketMQ源码解析
RocketMQ生产环境常见问题总结
主要内容
这一部分主要是了解RocketMQ和熟悉RocketMQ的部署。从安装搭建到部署监控平台,可以快速上手这一优秀的消息组件。
产品介绍
RocketMQ是阿里开源的一个消息中间件,在阿里内部经历了双十一的考验,能够处理亿万级别的消息。早起使用的ActiveMQ在消息量逐渐增多之后性能很快达到了瓶颈,而Kafka是针对日志收集等高并发场景设计的,不适用于多Topic的场景。所以阿里在吸取Kafka的优点后,自研了一个消息中间件产品并不断优化,促成了一个Apache顶级项目的诞生。
产品特点
当今MQ产品众多,其中影响力和使用范围最大的当属Kafka、RabbitMQ、RocketMQ和Pulsar。这几个产品在设计和实现上各有差异,造成他们适合于不用的细分场景。
优点 | 缺点 | 适合场景 | |
---|---|---|---|
Kafka | 吞吐量非常大,性能好,集群高可用 | 有丢数据可能,不适合大量topic,功能单一 | 日志分析、大数据采集 |
RabbitMQ | 可靠性高,支持AMQP协议,功能全面 | erlang语言不好扩展,吞吐量一般 | 企业内部小规模服务调用 |
Pulsar | 云原生计算与存储分离,支持跨数据中心容灾,支持多租户 | 新事物生态较少 | 日志分析、大数据采集、跨地区容灾 |
RocketMQ | 吞吐量高,功能全面,使用Java开发方便定制,丰富的管理工具 | 服务加载较慢 | 适合多种场景 |
快速搭建
搭建RocketMQ服务
RocketMQ官网地址为:https://rocketmq.apache.org/ ,下载地址为https://rocketmq.apache.org/download/。可以看到目前最新版本为5.3.0,而4.x的最后一个版本4.9.8已经于2024年3月停止维护,本次学习的还是4.x版本的,后续再会去学习5.x的新特性。
我们下载下来运行版本之后就可以上传到我们的服务器上并解压,解压后有几个重要的目录:
1 | [root@localhost ~]# cd /app/rocketmq-all-4.9.8-bin-release |
RocketMQ建议的运行环境为至少12G内存,但是我们这里学习阶段,虚拟机里面就给1G吧。此处需要修改bin
目录下runserver.sh
和runbroker.sh
两个脚本。 使用vi runserver.sh
命令,找到下面的配置并调整jvm的内存大小。
1 | JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" |
使用vi runbroker.sh
命令,找到下面的配置并调整jvm的内存大小。
1 | JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g" |
然后在安装完jdk之后,RocketMQ就可以启动了。
启动nameserver服务
在nohup.out中看到success字样说明nameserver就启动成功了。 使用jps命令可以看到一个java的应用正在运行:1
2
3
4
5
6
7
8
9[root@localhost rocketmq-all-4.9.8-bin-release]# nohup bin/mqnamesrv &
[1] 9187
[root@localhost rocketmq-all-4.9.8-bin-release]# nohup: ignoring input and appending output to ‘nohup.out’
^C
[root@localhost rocketmq-all-4.9.8-bin-release]# view nohup.out
OpenJDK 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
OpenJDK 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON1
29348 Jps
9206 NamesrvStartup启动broker服务 首先修改broker.conf中的一个配置,可以让broker自动创建topic。
另外,如果你的服务器配置了多张网卡,例如阿里云,腾讯云这样的云服务器,他们通常有内网网卡和外网网卡两张网卡,那么需要增加配置brokerIP1属性,指向服务器的外网IP地址,这样才能确保从其他服务器上访问到RocketMQ服务。 和启动nameserver一样启动broker服务:1
2
3[root@localhost rocketmq-all-4.9.8-bin-release]# vi conf/broker.conf
#在配置文件的最后添加一行
autoCreateTopicEnable=true在nohup.out中看到success字样说明broker就启动成功了。 使用jps命令可以看到两个java的应用正在运行:1
2
3
4
5
6
7[root@localhost rocketmq-all-4.9.8-bin-release]# nohup bin/mqbroker &
[2] 9252
[root@localhost rocketmq-all-4.9.8-bin-release]# nohup: ignoring input and appending output to ‘nohup.out’
^C
[root@localhost rocketmq-all-4.9.8-bin-release]# view nohup.out
The broker[localhost.localdomain, 192.168.30.3:10911] boot success. serializeType=JSON1
2
39348 Jps
9206 NamesrvStartup
9275 BrokerStartup
在实际部署时,可以和安装jdk一样把RocketMQ的bin目录添加到环境变量中,这样就可以直接使用mq的命令了。
1 | echo 'export ROCKETMQ_HOME=/app/rocketmq-all-4.9.8-bin-release' | sudo tee -a /etc/profile |
实现消息收发
执行命令添加环境变量
1
2echo 'export NAMESRV_ADDR=localhost:9876' | sudo tee -a /etc/profile
source /etc/profile启动RocketMQ生产者发送消息
这个指令会默认往mq发送1000条消息,执行后从命令行可以看到发送消息的日志:1
tools.sh org.apache.rocketmq.example.quickstart.Producer
可以看到最后两行表示发送完成之后服务正常关闭。1
2
3
4
5
6...
SendResult [sendStatus=SEND_OK, msgId=7F000001085F72EA2F775F87D09E03E5, offsetMsgId=C0A81E0300002A9F000000000002EB52, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=1], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=7F000001085F72EA2F775F87D09F03E6, offsetMsgId=C0A81E0300002A9F000000000002EC12, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=2], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=7F000001085F72EA2F775F87D0A003E7, offsetMsgId=C0A81E0300002A9F000000000002ECD2, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=3], queueOffset=249]
13:12:16.338 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.30.3:10911] result: true
13:12:16.342 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true启动消费者接收之前发送的消息
消费者启动完成后,可以看到消费到的消息:1
tools.sh org.apache.rocketmq.example.quickstart.Consumer
每1条这样的日志信息就表示消费者接收到了1条消息。这个Consumer消费者的指令并不会主动结束,他会继续挂起,等待消费新的消息。我们可以使用CTRL+C停止该进程。 从打印的消息体中,我们可以看到很多消息、队列以及broker相关的信息。1
2
3
4[root@localhost rocketmqlogs]# tools.sh org.apache.rocketmq.example.quickstart.Consumer
Consumer Started.
ConsumeMessageThread_please_rename_unique_group_name_4_1 Receive New Messages: [MessageExt [brokerName=localhost.localdomain, queueId=0, storeSize=190, queueOffset=0, sysFlag=0, bornTimestamp=1721365933162, bornHost=/192.168.30.3:33580, storeTimestamp=1721365933235, storeHost=/192.168.30.3:10911, msgId=C0A81E0300002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1721366213429, UNIQ_KEY=7F000001085F72EA2F775F87C4650000, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]
...
使用Java客户端
搭建一个标准的maven项目,pom.xml中引入如下依赖:
1
2
3
4
5<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.8</version>
</dependency>然后创建一个生产者类,编写如下代码:
这边记得修改nameserver的地址,我这里使用的是虚拟机,所以需要打开虚拟机9876和10911端口,9876是nameserver的端口,10911是broker的端口,如果只打开9876会导致客户端连接到nameserver之后根据nameserver返回的broker地址去建立连接时无法连接。 执行main方法后,可以看到控制台输出发送的结果:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public class BasicProducer {
public static void main(String[] args) {
// 初始化一个消息生产者
DefaultMQProducer producer = new DefaultMQProducer("basic_producer_group_name");
producer.setNamesrvAddr("192.168.30.3:9876");
try {
// 启动生产者
producer.start();
// 创建消息对象
Message msg = new Message("TopicTest", "basic_tag_A", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
producer.shutdown();
}
}
}1
SendResult [sendStatus=SEND_OK, msgId=7F0000014D9C02E5C6495FBCE8900000, offsetMsgId=C0A81E0300002A9F000000000002ED92, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=0], queueOffset=250]
创建⼀个消息消费者接收RocketMQ中的消息
执行main方法后就可以接受到刚刚发送的消息:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public class BasicConsumer {
public static void main(String[] args) {
// 构建⼀个消息消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("basic_consumer_group_name");
// 指定nameserver地址
consumer.setNamesrvAddr("192.168.30.3:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
try {
// 订阅⼀个消息队列
consumer.subscribe("basic_topic", "*");
// 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, consumeConcurrentlyContext) -> {
messageExtList.forEach(messageExt -> {
System.out.println(new String(messageExt.getBody()));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者
consumer.start();
}
catch (Exception e) {
e.printStackTrace();
}
}
}这样我们就完成一个最简单的生产消费流程。这些代码在RocketMQ的官网以及源码的example模块中都可以看到。1
Hello RocketMQ
搭建可视化管理服务
RocketMQ都是以后台服务的形式在运行,我们并不很清楚RocketMQ是如何运作的。 RocketMQ的社区就提供了一个图形化的管理控制台Dashboard,可以通过可视化的形式直接观测并管理RocketMQ的运行过程。
在RocketMQ的官网上,找到Dashboard下载源码自己使用maven打包或者使用docker安装。 我们这里有工具就下载源码,源码地址。
下载完成之后用idea打开,在maven菜单中右击package命令,点击Modify Run Configuration个性化执行命令,在package后面加上-Dmaven.test.skip=true
并执行,打包后文件为rocketmq-dashboard-1.0.1-SNAPSHOT.jar
。也可以直接使用maven命令行mvn clean package -Dmaven.test.skip=true
进行打包。
将jar包上传到服务器上之后,在jar包所在目录创建一个application.yml
文件,其中配置一下nameserver的地址:
1 | rocketmq: |
rocketmq-dashboard\src\main\resources\application.yml
的配置进行参考。配置完成之后执行java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar
运行该springboot工程,然后就可以通过8080端口进行访问了。
这个管理控制台的功能非常全面。dashboard菜单展示RocketMQ近期的运⾏情况,运维ops菜单主要是管理nameserver服务,集群cluster菜单主要管理broker服务。
分布式集群
前面我们使用了一台服务器搭建了nameserver+broker+dashboard的一整套RocketMQ服务,但是很明显生产系统我们需要做好高可用,防止单点故障。
RocketMQ的分布式集群基于主从架构,在多个服务器组成的集群中,指定一部分节点作为Master节点,负责相应客户端请求。另一部分节点作为Slave节点,负责备份Master节点上的数据,这样当Master节点出现故障时,在Slave节点上可以保留有数据备份,至少保证数据不会丢失。
我们目标是搭建一个由三个节点组成的Nameserver和2主2从的Broker集群,为了更方便的表示各个服务器,我把三个虚拟机的ip指定一个机器名如下:
1 | view /etc/hosts |
机器名 | Nameserver服务 | Broker部署 |
---|---|---|
node1 | Nameserver | |
node2 | Nameserver | broker-a, broker-b-slave |
node3 | Nameserver | broker-b, broker-a-slave |
部署Nameserver
部署Nameserver和之前一样,在三台机器上都启动nameserver服务即可
部署Broker
broker服务需要修改集群相关的参数。配置文件在conf目录下有示例:
2m-noslave:2主无从的集群配置
2m-2s-async:2主2从的集群配置。其中async/sync表示主节点与从节点之间是异步同步还是同步同步。
dledger:具备主从切换功能的高可用集群。集群中的节点会基于Raft协议随机选举中一个Leader,作用类似于Master节点。其他节点都是Follower,作用类似于Slave节点。
本次我们就采用2m-2s-async
的方式搭建集群,需要修改node2和node3下的配置文件。
配置第一组broker-a的主从服务 在node2机器上配置broker-a的master服务,需要修改
conf/2m-2s-async/broker-a.properties
示例如下:在node3机器上配置broker-a的salve服务,需要修改1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=DefaultCluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-a
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=node1:9876;node2:9876;node3:9876
#是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
deleteWhen=04
#消息存储时间
fileReservedTime=48
#Broker的角色
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
#存储路径
storePathRootDir=/app/rocketmq/store
storePathCommitLog=/app/rocketmq/store/commitlog
storePathConsumeQueue=/app/rocketmq/store/consumequeue
storePathIndex=/app/rocketmq/store/index
storeCheckpoint=/app/rocketmq/store/checkpoint
abortFile=/app/rocketmq/store/abort
#Broker 对外服务的监听端口
listenPort=10911conf/2m-2s-async/broker-a-s.properties
示例如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=DefaultCluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-a
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=node1:9876;node2:9876;node3:9876
#是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
deleteWhen=04
#消息存储时间
fileReservedTime=48
#Broker的角色
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
#存储路径
storePathRootDir=/app/rocketmq/storeSlave
storePathCommitLog=/app/rocketmq/storeSlave/commitlog
storePathConsumeQueue=/app/rocketmq/storeSlave/consumequeue
storePathIndex=/app/rocketmq/storeSlave/index
storeCheckpoint=/app/rocketmq/storeSlave/checkpoint
abortFile=/app/rocketmq/storeSlave/abort
#Broker 对外服务的监听端口
listenPort=11011配置第二组broker-b的主从服务 在node3机器上配置broker-b的master服务,需要修改
conf/2m-2s-async/broker-b.properties
示例如下:在node2机器上配置broker-b的slave服务,需要修改1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=DefaultCluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-b
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=node1:9876;node2:9876;node3:9876
#是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
deleteWhen=04
#消息存储时间
fileReservedTime=48
#Broker的角色
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
#存储路径
storePathRootDir=/app/rocketmq/store
storePathCommitLog=/app/rocketmq/store/commitlog
storePathConsumeQueue=/app/rocketmq/store/consumequeue
storePathIndex=/app/rocketmq/store/index
storeCheckpoint=/app/rocketmq/store/checkpoint
abortFile=/app/rocketmq/store/abort
#Broker 对外服务的监听端口
listenPort=10911conf/2m-2s-async/broker-b-s.properties
示例如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=DefaultCluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-b
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=node1:9876;node2:9876;node3:9876
#是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
deleteWhen=04
#消息存储时间
fileReservedTime=48
#Broker的角色
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
#存储路径
storePathRootDir=/app/rocketmq/storeSlave
storePathCommitLog=/app/rocketmq/storeSlave/commitlog
storePathConsumeQueue=/app/rocketmq/storeSlave/consumequeue
storePathIndex=/app/rocketmq/storeSlave/index
storeCheckpoint=/app/rocketmq/storeSlave/checkpoint
abortFile=/app/rocketmq/storeSlave/abort
#Broker 对外服务的监听端口
listenPort=11011启动Broker服务 在node2节点上启动broker-a和broker-b-s:
在node3节点上启动broker-a-s和broker-b:1
2nohup mqbroker -c /app/rocketmq-all-4.9.8-bin-release/conf/2m-2s-async/broker-a.properties &
nohup mqbroker -c /app/rocketmq-all-4.9.8-bin-release/conf/2m-2s-async/broker-b-s.properties &1
2nohup mqbroker -c /app/rocketmq-all-4.9.8-bin-release/conf/2m-2s-async/broker-b.properties &
nohup mqbroker -c /app/rocketmq-all-4.9.8-bin-release/conf/2m-2s-async/broker-a-s.properties &检查集群服务状态 再启动完成之后,可以通过mq的命令行命令查看集群状态,bin文件夹下有mqadmin指令,执⾏这个指令需要在机器上配置了NAMESRV环境变量。通过该指令查看RocketMQ集群状态:
mqadmin还有很多其他功能,比如新增Topic,查询消息等等,具体可以直接执行1
2
3
4
5
6[root@localhost 2m-2s-async]# mqadmin clusterList
#Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
DefaultCluster broker-a 0 192.168.30.4:10911 V4_9_8 0.00(0,0ms) 0.00(0,0ms) 0 478233.88 0.1400
DefaultCluster broker-a 1 192.168.30.5:11011 V4_9_8 0.00(0,0ms) 0.00(0,0ms) 0 478233.88 0.1400
DefaultCluster broker-b 0 192.168.30.5:10911 V4_9_8 0.00(0,0ms) 0.00(0,0ms) 0 478233.88 0.1400
DefaultCluster broker-b 1 192.168.30.4:11011 V4_9_8 0.00(0,0ms) 0.00(0,0ms) 0 478233.88 0.1400mqadmin
并使用mqadmin help [COMMAND]
来查看具体功能。当然我们之前搭建的dashboard会用起来更加清晰,我们这里就启动我们的dashboard来查看集群信息。
注意配置dashboard的Nameserver需要加上集群的Nameserver地址:
1 | rocketmq: |
在RocketMQ的这种主从架构的集群下,客户端发送的消息会分散保存到broker-a和broker-b两个服务上,然后每个服务都配有slave服务,可以备份对应master服务上的消息,这样就可以防⽌单点故障造成的消息丢失问题。
升级高可用集群
主从架构的RocketMQ集群给master节点配置了slave节点作为备份,可以保证broker上的消息不会丢失,但是master节点宕机后,消息就不再会往该broker发送,并且也无法从该broker的slave节点中读取出消息,只有master重启后才能正常被消费者读取。也就是说slave节点只能保证数据不丢失,但是无法保证服务高可用。
这时我们就希望集群能够自动将slave节点升级为master节点继续对外提供服务,这样整个集群的消息服务不会中断。而RocketMQ的Dledger集群就具备这种角色转换的功能。
在Dledger集群中,就不再单独指定各个broker的服务,而是由这些broker服务自己进行选举,产生1个Leader节点的服务,响应客户端的各种请求。其他的broker服务就作为Follower节点,负责对Leader上的数据进行备份。当然,Follower所要负责的事情相比主从架构中的SLAVE节点会要复杂一点,因为这种节点选举是在后端不断进行的,他们需要随时做好升级成Leader的准备。
接下来我们就使用之前的3台服务器来搭建一个3个节点的Dledger集群,这个集群中只要有2台服务正常运行,这个集群就能正常工作。
部署nameserver 这一步可以使用之前的nameserver,实际上nameserver是会自动感知broker的变化的,搭建的时候都不需要重新启动。
对Broker服务进行配置 和主从结构一样,RocketMQ给出了搭建Dledger集群的配置样例,在
conf/dledger
目录下RocketMQ给出了3个配置文件,这3个文件可以在单机上直接部署成一个具有3个服务的Dledger集群,我们按照此配置进行略微修改。 对于node1的broker.conf
配置如下:其中和Dledger集群相关的参数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15brokerClusterName=RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=node1:9876;node2:9876;node3:9876
storePathRootDir=/app/rocketmq/storeDledger/
storePathCommitLog=/app/rocketmq/storeDledger/commitlog storePathConsumeQueue=/app/rocketmq/storeDledger/consumequeue
storePathIndex=/app/rocketmq/storeDledger/index
storeCheckpoint=/app/rocketmq/storeDledger/checkpoint
abortFile=/app/rocketmq/storeDledger/abort
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-node1:40911;n1-node2:40911;n2-node3:40911
## must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16
- dLedgerGroup: Dledger Raft Group的名字,建议跟brokerName保持⼀致。
- dLedgerPeers: Dledger Group内各个服务节点的地址及端口信息。同一个Group内的各个节点配置必须要保持一致。
- dLedgerSelfId: Dledger节点ID,必须属于dLedgerPeers中的一个。同一个Group内的各个节点必须不能重复。 node2和node3就只需要把对应的
n0
修改为n1
和n2
即可。
- 启动服务启动完成之后我们使用
1
nohup mqbroker -c /app/rocketmq-all-4.9.8-bin-release/conf/dledger/broker.conf &
mqadmin
查看集群状态:此时node3机器上的broker被选成了master(id为0,这个id和配置的dLedgerSelfId没有关联),如果此时关闭node3机器上的broker,就会发现node1和node2中被随机出来一个节点作为了master。如果继续关闭一个服务器上的broker,那么集群会因为可用服务少于半数而无法正常工作。1
2
3
4
5[root@localhost dledger]# mqadmin clusterList
#Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
RaftCluster RaftNode00 0 192.168.30.5:30911 V4_9_8 0.00(0,0ms) 0.00(0,0ms) 0 478236.00 0.1400
RaftCluster RaftNode00 1 192.168.30.3:30911 V4_9_8 0.00(0,0ms) 0.00(0,0ms) 0 478236.00 0.1400
RaftCluster RaftNode00 2 192.168.30.4:30911 V4_9_8 0.00(0,0ms) 0.00(0,0ms) 0 478236.00 0.1500
Dledger集群机制是RocketMQ4.5版本开始⽀持的⼀个重要特性。他其实是由OpenMessage组织带入RocketMQ的一个系列框架。他是一个为高可用、高性能、高可靠的分布式存储系统提供基础支持的组件。他做的事情主要有两个,一是在集群中选举产生master节点。RocketMQ集群需要用这个master节点响应客户端的各种请求。二是在各种复杂的分布式场景下,保证CommitLog日志文件在集群中的强一致性。
Dledger集群背后的核心就是Raft协议。这是一种强大的分布式选举算法,其核⼼是只要集群中超过半数的节点作出的共同决议,就认为是集群最终的共同决议。Kafka也在之后基于Raft协议,自行实现了Kraft集群机制。
Dledger集群由于会接管RocketMQ原生的文件写入,所以,Dledger集群的文件写入速度比RocketMQ的原生写入方式是要慢一点的。这会对RocketMQ的性能产生一些影响。所以,当前版本的Dledger集群在企业中用得并不是太多。5.0版本对Dledger集群抽出了一种Dledger Controller模式,也就是只用Dledger集群的选举功能,而不用他的Commit文件写入功能,这样性能可以得到一定的提升。
总结RocketMQ的运行架构
通过上面简单的实验,我们已经对RocketMQ的运行机制有了一个大概的了解。接下来我们梳理一下RocketMQ中各个组件的作用:
- nameServer服务 nameServer不依靠任何其他服务自己就能独立启动。并且不管是broker还是客户端,都需要明确指定nameServer地址。整个RocketMQ集群都要在nameServer的协调下才能正常工作。
- broker服务 broker服务是整个集群中设计最为繁琐的部分,最核心的消息存储、传递、查询等功能都由Broker来提供。
- client客户端 client客户端包括生产者和消费者。从生产者产生的数据经过nameServer的调度在broker中进行分发给对应的消费者。
理解RocketMQ的消息模型
在学习消息模型之前,我们先往之前的两个节点(2m-2s)的集群发送一些消息。
1 | tools.sh org.apache.rocketmq.example.quickstart.Producer |
1 | SendResult [sendStatus=SEND_OK, msgId=7F000001097A72EA2F77708EA51E03E7, offsetMsgId=C0A81E0500002A9F0000000000017608, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=1], queueOffset=124] |
TopicTest
的主题,这就是运行测试类所创建的主题。点击【状态】可以看到主题上的消息分布。可以看到broker-a和broker-b上各有4个队列,每一个队列分有125条消息,其中offset可以理解和Kafka一样是消息的索引,最大offset就是当前最新接收到消息保存的位置。回头来看之前日志中打印的SendResult的信息。日志中的MessageQueue就代表这一条消息存在哪个队列上了。而queueOffset就表示这条消息记录在MessageQueue的哪个位置。
我们再启动一个消费者来消费消息:
1 | tools.sh org.apache.rocketmq.example.quickstart.Consumer |
Consumer启动完成后,我们可以在控制台看到很多类似这样的⽇志:
1 | ConsumeMessageThread_please_rename_unique_group_name_4_1 Receive New Messages: [MessageExt [brokerName=broker-a3, storeSize=192, queueOffset=47, sysFlag=0, bornTimestamp=1721651595296, bornHost=/192.168.30.3:33990, storeTi21651595170, storeHost=/192.168.30.4:10911, msgId=C0A81E0400002A9F0000000000008F0A, commitLogOffset=36618, body7070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={=0, MAX_OFFSET=125, CONSUME_START_TIME=1721651995917, UNIQ_KEY=7F000001097A72EA2F77708EA020017D, CLUSTER=DefaulTAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 51, 56, 49], transactionI] |
我们可以从dashboard的【消费者】菜单中验证一下消费情况,点击对应消费者后,可以看到从中看到消费了TopicTest主题的消息,并且启动了一个叫做please_rename_unique_group_name_4
的消费者组,并且消费都是以组消费的,我们可以自定义一个新的消费组来消费这个主题的信息。这时RocketMQ就会单独记录新消费组的消费进度,而新的消费组也可以消费到TopicTest的全部消息。
对上面的现象进行梳理,我们可以得出模型。生产者和消费者都可以指定一个Topic发送消息或者拉去消息,而Topic是一个逻辑概念,Topic中的消息被分布在后面多个MessageQueue中,这些MessageQueue又会分布在一个或者多个broker上,这一点和Kafka非常相似。
在RocketMQ的这个消息模型当中,最为核心的就是Topic。对于客户端,Topic代表了一类有相同业务规则的消息。对于Broker,Topic则代表了系统中一系列存储消息的资源。所以,RocketMQ对于Topic是需要做严格管理的。如果任由客户端随意创建Topic,那么服务端的资源管理压力就会非常大。默认情况下,Topic都需要由管理员在RocketMQ的服务端手动进行创建,然后才能给客户端使用的。而我们之前在broker.conf中手动添加的autoCreateTopic=true
,就是表示可以由客户端自行创建Topic。这种配置方式显然只适用于测试环境,在生产环境不建议打开这个配置项。
对于业务来说,最为重要的就是消息Message了。生产者发送到某一个Topic下的消息,最终会保存在Topic下的某一个MessageQueue中。在消费者来消费消息时,RocketMQ会在Broker端给每个消费者组记录一个消息的消费位点Offset。通过Offset控制每个消费者组的消息处理进度。这样,每一条消息,在一个消费者组当中只被处理一次。
小结
这一章节,主要是快速熟悉RocketMQ产品,并通过操作,理解总结RocketMQ的运行架构以及消息模型。这些抽象的模型和架构和Kafka有很多相似之处,可以与Kafka进行对比学习。当然,RocketMQ相比Kafka拥有更多业务功能,能够满足各类业务场景。