RocketMQ使用指南1——简介和集群部署


目录

  1. RocketMQ简介和集群架构与原理(本篇)

  2. 理解RocketMQ的消息模型并与Spring集成

  3. RocketMQ源码解析

  4. RocketMQ生产环境常见问题总结

主要内容

这一部分主要是了解RocketMQ和熟悉RocketMQ的部署。从安装搭建到部署监控平台,可以快速上手这一优秀的消息组件。

产品介绍

RocketMQ是阿里开源的一个消息中间件,在阿里内部经历了双十一的考验,能够处理亿万级别的消息。早起使用的ActiveMQ在消息量逐渐增多之后性能很快达到了瓶颈,而Kafka是针对日志收集等高并发场景设计的,不适用于多Topic的场景。所以阿里在吸取Kafka的优点后,自研了一个消息中间件产品并不断优化,促成了一个Apache顶级项目的诞生。

产品特点

当今MQ产品众多,其中影响力和使用范围最大的当属Kafka、RabbitMQ、RocketMQ和Pulsar。这几个产品在设计和实现上各有差异,造成他们适合于不用的细分场景。

优点缺点适合场景
Kafka吞吐量非常大,性能好,集群高可用有丢数据可能,不适合大量topic,功能单一日志分析、大数据采集
RabbitMQ可靠性高,支持AMQP协议,功能全面erlang语言不好扩展,吞吐量一般企业内部小规模服务调用
Pulsar云原生计算与存储分离,支持跨数据中心容灾,支持多租户新事物生态较少日志分析、大数据采集、跨地区容灾
RocketMQ吞吐量高,功能全面,使用Java开发方便定制,丰富的管理工具服务加载较慢适合多种场景

快速搭建

搭建RocketMQ服务

RocketMQ官网地址为:https://rocketmq.apache.org/ ,下载地址为https://rocketmq.apache.org/download/。可以看到目前最新版本为5.3.0,而4.x的最后一个版本4.9.8已经于2024年3月停止维护,本次学习的还是4.x版本的,后续再会去学习5.x的新特性。

我们下载下来运行版本之后就可以上传到我们的服务器上并解压,解压后有几个重要的目录:

1
2
3
4
5
6
7
8
9
10
[root@localhost ~]# cd /app/rocketmq-all-4.9.8-bin-release
[root@localhost rocketmq-all-4.9.8-bin-release]# ls -l
total 44
drwxr-xr-x. 2 root root 126 Feb 22 18:24 benchmark #压测脚本
drwxr-xr-x. 3 root root 4096 Feb 22 17:26 bin #执行脚本
drwxr-xr-x. 6 root root 211 Feb 22 17:26 conf #配置文件
drwxr-xr-x. 2 root root 4096 Feb 22 18:24 lib #运行jar包
-rw-r--r--. 1 root root 17327 Feb 22 17:26 LICENSE
-rw-r--r--. 1 root root 1338 Feb 22 17:26 NOTICE
-rw-r--r--. 1 root root 11219 Feb 22 17:26 README.md

RocketMQ建议的运行环境为至少12G内存,但是我们这里学习阶段,虚拟机里面就给1G吧。此处需要修改bin目录下runserver.shrunbroker.sh两个脚本。 使用vi runserver.sh命令,找到下面的配置并调整jvm的内存大小。

1
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

使用vi runbroker.sh命令,找到下面的配置并调整jvm的内存大小。

1
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g"

然后在安装完jdk之后,RocketMQ就可以启动了。

  1. 启动nameserver服务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    [root@localhost rocketmq-all-4.9.8-bin-release]# nohup bin/mqnamesrv &
    [1] 9187
    [root@localhost rocketmq-all-4.9.8-bin-release]# nohup: ignoring input and appending output to ‘nohup.out’
    ^C
    [root@localhost rocketmq-all-4.9.8-bin-release]# view nohup.out

    OpenJDK 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
    OpenJDK 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
    The Name Server boot success. serializeType=JSON
    在nohup.out中看到success字样说明nameserver就启动成功了。 使用jps命令可以看到一个java的应用正在运行:
    1
    2
    9348 Jps
    9206 NamesrvStartup

  2. 启动broker服务 首先修改broker.conf中的一个配置,可以让broker自动创建topic。

    1
    2
    3
    [root@localhost rocketmq-all-4.9.8-bin-release]# vi conf/broker.conf
    #在配置文件的最后添加一行
    autoCreateTopicEnable=true
    另外,如果你的服务器配置了多张网卡,例如阿里云,腾讯云这样的云服务器,他们通常有内网网卡和外网网卡两张网卡,那么需要增加配置brokerIP1属性,指向服务器的外网IP地址,这样才能确保从其他服务器上访问到RocketMQ服务。 和启动nameserver一样启动broker服务:
    1
    2
    3
    4
    5
    6
    7
    [root@localhost rocketmq-all-4.9.8-bin-release]# nohup bin/mqbroker &
    [2] 9252
    [root@localhost rocketmq-all-4.9.8-bin-release]# nohup: ignoring input and appending output to ‘nohup.out’
    ^C
    [root@localhost rocketmq-all-4.9.8-bin-release]# view nohup.out

    The broker[localhost.localdomain, 192.168.30.3:10911] boot success. serializeType=JSON
    在nohup.out中看到success字样说明broker就启动成功了。 使用jps命令可以看到两个java的应用正在运行:
    1
    2
    3
    9348 Jps
    9206 NamesrvStartup
    9275 BrokerStartup

在实际部署时,可以和安装jdk一样把RocketMQ的bin目录添加到环境变量中,这样就可以直接使用mq的命令了。

1
2
3
4
5
echo 'export ROCKETMQ_HOME=/app/rocketmq-all-4.9.8-bin-release' | sudo tee -a /etc/profile
echo 'export PATH=$PATH:$ROCKETMQ_HOME/bin' | sudo tee -a /etc/profile

nohup mqnamesrv &
nohup mqbroker &

实现消息收发

  1. 执行命令添加环境变量

    1
    2
    echo 'export NAMESRV_ADDR=localhost:9876' | sudo tee -a /etc/profile
    source /etc/profile

  2. 启动RocketMQ生产者发送消息

    1
    tools.sh org.apache.rocketmq.example.quickstart.Producer
    这个指令会默认往mq发送1000条消息,执行后从命令行可以看到发送消息的日志:
    1
    2
    3
    4
    5
    6
    ...
    SendResult [sendStatus=SEND_OK, msgId=7F000001085F72EA2F775F87D09E03E5, offsetMsgId=C0A81E0300002A9F000000000002EB52, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=1], queueOffset=249]
    SendResult [sendStatus=SEND_OK, msgId=7F000001085F72EA2F775F87D09F03E6, offsetMsgId=C0A81E0300002A9F000000000002EC12, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=2], queueOffset=249]
    SendResult [sendStatus=SEND_OK, msgId=7F000001085F72EA2F775F87D0A003E7, offsetMsgId=C0A81E0300002A9F000000000002ECD2, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=3], queueOffset=249]
    13:12:16.338 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.30.3:10911] result: true
    13:12:16.342 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
    可以看到最后两行表示发送完成之后服务正常关闭。

  3. 启动消费者接收之前发送的消息

    1
    tools.sh org.apache.rocketmq.example.quickstart.Consumer
    消费者启动完成后,可以看到消费到的消息:
    1
    2
    3
    4
    [root@localhost rocketmqlogs]# tools.sh org.apache.rocketmq.example.quickstart.Consumer
    Consumer Started.
    ConsumeMessageThread_please_rename_unique_group_name_4_1 Receive New Messages: [MessageExt [brokerName=localhost.localdomain, queueId=0, storeSize=190, queueOffset=0, sysFlag=0, bornTimestamp=1721365933162, bornHost=/192.168.30.3:33580, storeTimestamp=1721365933235, storeHost=/192.168.30.3:10911, msgId=C0A81E0300002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1721366213429, UNIQ_KEY=7F000001085F72EA2F775F87C4650000, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]
    ...
    每1条这样的日志信息就表示消费者接收到了1条消息。这个Consumer消费者的指令并不会主动结束,他会继续挂起,等待消费新的消息。我们可以使用CTRL+C停止该进程。 从打印的消息体中,我们可以看到很多消息、队列以及broker相关的信息。

使用Java客户端

  1. 搭建一个标准的maven项目,pom.xml中引入如下依赖:

    1
    2
    3
    4
    5
    <dependency>  
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.8</version>
    </dependency>

  2. 然后创建一个生产者类,编写如下代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public class BasicProducer {  
    public static void main(String[] args) {
    // 初始化一个消息生产者
    DefaultMQProducer producer = new DefaultMQProducer("basic_producer_group_name");
    producer.setNamesrvAddr("192.168.30.3:9876");
    try {
    // 启动生产者
    producer.start();
    // 创建消息对象
    Message msg = new Message("TopicTest", "basic_tag_A", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
    // 发送消息
    SendResult sendResult = producer.send(msg);
    System.out.printf("%s%n", sendResult);
    } catch (Exception e) {
    throw new RuntimeException(e);
    } finally {
    producer.shutdown();
    }
    }
    }
    这边记得修改nameserver的地址,我这里使用的是虚拟机,所以需要打开虚拟机9876和10911端口,9876是nameserver的端口,10911是broker的端口,如果只打开9876会导致客户端连接到nameserver之后根据nameserver返回的broker地址去建立连接时无法连接。 执行main方法后,可以看到控制台输出发送的结果:
    1
    SendResult [sendStatus=SEND_OK, msgId=7F0000014D9C02E5C6495FBCE8900000, offsetMsgId=C0A81E0300002A9F000000000002ED92, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=0], queueOffset=250]

  3. 创建⼀个消息消费者接收RocketMQ中的消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public class BasicConsumer {  
    public static void main(String[] args) {
    // 构建⼀个消息消费者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("basic_consumer_group_name");
    // 指定nameserver地址
    consumer.setNamesrvAddr("192.168.30.3:9876");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    try {
    // 订阅⼀个消息队列
    consumer.subscribe("basic_topic", "*");
    // 注册消息监听器
    consumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, consumeConcurrentlyContext) -> {
    messageExtList.forEach(messageExt -> {
    System.out.println(new String(messageExt.getBody()));
    });
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    // 启动消费者
    consumer.start();
    }
    catch (Exception e) {
    e.printStackTrace();
    }
    }
    }
    执行main方法后就可以接受到刚刚发送的消息:
    1
    Hello RocketMQ
    这样我们就完成一个最简单的生产消费流程。这些代码在RocketMQ的官网以及源码的example模块中都可以看到。

搭建可视化管理服务

RocketMQ都是以后台服务的形式在运行,我们并不很清楚RocketMQ是如何运作的。 RocketMQ的社区就提供了一个图形化的管理控制台Dashboard,可以通过可视化的形式直接观测并管理RocketMQ的运行过程。

在RocketMQ的官网上,找到Dashboard下载源码自己使用maven打包或者使用docker安装。 我们这里有工具就下载源码,源码地址

下载完成之后用idea打开,在maven菜单中右击package命令,点击Modify Run Configuration个性化执行命令,在package后面加上-Dmaven.test.skip=true并执行,打包后文件为rocketmq-dashboard-1.0.1-SNAPSHOT.jar。也可以直接使用maven命令行mvn clean package -Dmaven.test.skip=true进行打包。

将jar包上传到服务器上之后,在jar包所在目录创建一个application.yml文件,其中配置一下nameserver的地址:

1
2
3
4
rocketmq: 
config:
namesrvAddrs:
- 192.168.30.3:9876
其他配置可以直接查看源码中rocketmq-dashboard\src\main\resources\application.yml的配置进行参考。

配置完成之后执行java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar运行该springboot工程,然后就可以通过8080端口进行访问了。

这个管理控制台的功能非常全面。dashboard菜单展示RocketMQ近期的运⾏情况,运维ops菜单主要是管理nameserver服务,集群cluster菜单主要管理broker服务。

分布式集群

前面我们使用了一台服务器搭建了nameserver+broker+dashboard的一整套RocketMQ服务,但是很明显生产系统我们需要做好高可用,防止单点故障。

RocketMQ的分布式集群基于主从架构,在多个服务器组成的集群中,指定一部分节点作为Master节点,负责相应客户端请求。另一部分节点作为Slave节点,负责备份Master节点上的数据,这样当Master节点出现故障时,在Slave节点上可以保留有数据备份,至少保证数据不会丢失。

我们目标是搭建一个由三个节点组成的Nameserver和2主2从的Broker集群,为了更方便的表示各个服务器,我把三个虚拟机的ip指定一个机器名如下:

1
2
3
4
view /etc/hosts
192.168.30.3 node1
192.168.30.4 node2
192.168.30.5 node3
对应的部署情况如下:

机器名Nameserver服务Broker部署
node1Nameserver
node2Nameserverbroker-a, broker-b-slave
node3Nameserverbroker-b, broker-a-slave

部署Nameserver

部署Nameserver和之前一样,在三台机器上都启动nameserver服务即可

部署Broker

broker服务需要修改集群相关的参数。配置文件在conf目录下有示例:

  • 2m-noslave:2主无从的集群配置

  • 2m-2s-async:2主2从的集群配置。其中async/sync表示主节点与从节点之间是异步同步还是同步同步。

  • dledger:具备主从切换功能的高可用集群。集群中的节点会基于Raft协议随机选举中一个Leader,作用类似于Master节点。其他节点都是Follower,作用类似于Slave节点。

本次我们就采用2m-2s-async的方式搭建集群,需要修改node2和node3下的配置文件。

  1. 配置第一组broker-a的主从服务 在node2机器上配置broker-a的master服务,需要修改conf/2m-2s-async/broker-a.properties 示例如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    #所属集群名字,名字一样的节点就在同一个集群内
    brokerClusterName=DefaultCluster
    #broker名字,名字一样的节点就是一组主从节点。
    brokerName=broker-a
    #brokerid,0就表示是Master,>0的都是表示 Slave
    brokerId=0
    #nameServer地址,分号分割
    namesrvAddr=node1:9876;node2:9876;node3:9876
    #是否允许Broker自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    deleteWhen=04
    #消息存储时间
    fileReservedTime=48
    #Broker的角色
    brokerRole=ASYNC_MASTER
    flushDiskType=ASYNC_FLUSH
    #存储路径
    storePathRootDir=/app/rocketmq/store
    storePathCommitLog=/app/rocketmq/store/commitlog
    storePathConsumeQueue=/app/rocketmq/store/consumequeue
    storePathIndex=/app/rocketmq/store/index
    storeCheckpoint=/app/rocketmq/store/checkpoint
    abortFile=/app/rocketmq/store/abort
    #Broker 对外服务的监听端口
    listenPort=10911
    在node3机器上配置broker-a的salve服务,需要修改conf/2m-2s-async/broker-a-s.properties 示例如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    #所属集群名字,名字一样的节点就在同一个集群内
    brokerClusterName=DefaultCluster
    #broker名字,名字一样的节点就是一组主从节点。
    brokerName=broker-a
    #brokerid,0就表示是Master,>0的都是表示 Slave
    brokerId=1
    #nameServer地址,分号分割
    namesrvAddr=node1:9876;node2:9876;node3:9876
    #是否允许Broker自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    deleteWhen=04
    #消息存储时间
    fileReservedTime=48
    #Broker的角色
    brokerRole=SLAVE
    flushDiskType=ASYNC_FLUSH
    #存储路径
    storePathRootDir=/app/rocketmq/storeSlave
    storePathCommitLog=/app/rocketmq/storeSlave/commitlog
    storePathConsumeQueue=/app/rocketmq/storeSlave/consumequeue
    storePathIndex=/app/rocketmq/storeSlave/index
    storeCheckpoint=/app/rocketmq/storeSlave/checkpoint
    abortFile=/app/rocketmq/storeSlave/abort
    #Broker 对外服务的监听端口
    listenPort=11011

  2. 配置第二组broker-b的主从服务 在node3机器上配置broker-b的master服务,需要修改conf/2m-2s-async/broker-b.properties 示例如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    #所属集群名字,名字一样的节点就在同一个集群内
    brokerClusterName=DefaultCluster
    #broker名字,名字一样的节点就是一组主从节点。
    brokerName=broker-b
    #brokerid,0就表示是Master,>0的都是表示 Slave
    brokerId=0
    #nameServer地址,分号分割
    namesrvAddr=node1:9876;node2:9876;node3:9876
    #是否允许Broker自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    deleteWhen=04
    #消息存储时间
    fileReservedTime=48
    #Broker的角色
    brokerRole=ASYNC_MASTER
    flushDiskType=ASYNC_FLUSH
    #存储路径
    storePathRootDir=/app/rocketmq/store
    storePathCommitLog=/app/rocketmq/store/commitlog
    storePathConsumeQueue=/app/rocketmq/store/consumequeue
    storePathIndex=/app/rocketmq/store/index
    storeCheckpoint=/app/rocketmq/store/checkpoint
    abortFile=/app/rocketmq/store/abort
    #Broker 对外服务的监听端口
    listenPort=10911
    在node2机器上配置broker-b的slave服务,需要修改conf/2m-2s-async/broker-b-s.properties 示例如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    #所属集群名字,名字一样的节点就在同一个集群内
    brokerClusterName=DefaultCluster
    #broker名字,名字一样的节点就是一组主从节点。
    brokerName=broker-b
    #brokerid,0就表示是Master,>0的都是表示 Slave
    brokerId=1
    #nameServer地址,分号分割
    namesrvAddr=node1:9876;node2:9876;node3:9876
    #是否允许Broker自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    deleteWhen=04
    #消息存储时间
    fileReservedTime=48
    #Broker的角色
    brokerRole=SLAVE
    flushDiskType=ASYNC_FLUSH
    #存储路径
    storePathRootDir=/app/rocketmq/storeSlave
    storePathCommitLog=/app/rocketmq/storeSlave/commitlog
    storePathConsumeQueue=/app/rocketmq/storeSlave/consumequeue
    storePathIndex=/app/rocketmq/storeSlave/index
    storeCheckpoint=/app/rocketmq/storeSlave/checkpoint
    abortFile=/app/rocketmq/storeSlave/abort
    #Broker 对外服务的监听端口
    listenPort=11011

  3. 启动Broker服务 在node2节点上启动broker-a和broker-b-s:

    1
    2
    nohup mqbroker -c /app/rocketmq-all-4.9.8-bin-release/conf/2m-2s-async/broker-a.properties &
    nohup mqbroker -c /app/rocketmq-all-4.9.8-bin-release/conf/2m-2s-async/broker-b-s.properties &
    在node3节点上启动broker-a-s和broker-b:
    1
    2
    nohup mqbroker -c /app/rocketmq-all-4.9.8-bin-release/conf/2m-2s-async/broker-b.properties &
    nohup mqbroker -c /app/rocketmq-all-4.9.8-bin-release/conf/2m-2s-async/broker-a-s.properties &

  4. 检查集群服务状态 再启动完成之后,可以通过mq的命令行命令查看集群状态,bin文件夹下有mqadmin指令,执⾏这个指令需要在机器上配置了NAMESRV环境变量。通过该指令查看RocketMQ集群状态:

    1
    2
    3
    4
    5
    6
    [root@localhost 2m-2s-async]# mqadmin clusterList
    #Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
    DefaultCluster broker-a 0 192.168.30.4:10911 V4_9_8 0.00(0,0ms) 0.00(0,0ms) 0 478233.88 0.1400
    DefaultCluster broker-a 1 192.168.30.5:11011 V4_9_8 0.00(0,0ms) 0.00(0,0ms) 0 478233.88 0.1400
    DefaultCluster broker-b 0 192.168.30.5:10911 V4_9_8 0.00(0,0ms) 0.00(0,0ms) 0 478233.88 0.1400
    DefaultCluster broker-b 1 192.168.30.4:11011 V4_9_8 0.00(0,0ms) 0.00(0,0ms) 0 478233.88 0.1400
    mqadmin还有很多其他功能,比如新增Topic,查询消息等等,具体可以直接执行mqadmin并使用mqadmin help [COMMAND]来查看具体功能。当然我们之前搭建的dashboard会用起来更加清晰,我们这里就启动我们的dashboard来查看集群信息。

注意配置dashboard的Nameserver需要加上集群的Nameserver地址:

1
2
3
4
5
6
rocketmq:
config:
namesrvAddrs:
- node1:9876
- node2:9876
- node3:9876

在RocketMQ的这种主从架构的集群下,客户端发送的消息会分散保存到broker-a和broker-b两个服务上,然后每个服务都配有slave服务,可以备份对应master服务上的消息,这样就可以防⽌单点故障造成的消息丢失问题。

升级高可用集群

主从架构的RocketMQ集群给master节点配置了slave节点作为备份,可以保证broker上的消息不会丢失,但是master节点宕机后,消息就不再会往该broker发送,并且也无法从该broker的slave节点中读取出消息,只有master重启后才能正常被消费者读取。也就是说slave节点只能保证数据不丢失,但是无法保证服务高可用。

这时我们就希望集群能够自动将slave节点升级为master节点继续对外提供服务,这样整个集群的消息服务不会中断。而RocketMQ的Dledger集群就具备这种角色转换的功能。

在Dledger集群中,就不再单独指定各个broker的服务,而是由这些broker服务自己进行选举,产生1个Leader节点的服务,响应客户端的各种请求。其他的broker服务就作为Follower节点,负责对Leader上的数据进行备份。当然,Follower所要负责的事情相比主从架构中的SLAVE节点会要复杂一点,因为这种节点选举是在后端不断进行的,他们需要随时做好升级成Leader的准备。

接下来我们就使用之前的3台服务器来搭建一个3个节点的Dledger集群,这个集群中只要有2台服务正常运行,这个集群就能正常工作。

  1. 部署nameserver 这一步可以使用之前的nameserver,实际上nameserver是会自动感知broker的变化的,搭建的时候都不需要重新启动。

  2. 对Broker服务进行配置 和主从结构一样,RocketMQ给出了搭建Dledger集群的配置样例,在conf/dledger目录下RocketMQ给出了3个配置文件,这3个文件可以在单机上直接部署成一个具有3个服务的Dledger集群,我们按照此配置进行略微修改。 对于node1的broker.conf配置如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    brokerClusterName=RaftCluster
    brokerName=RaftNode00
    listenPort=30911
    namesrvAddr=node1:9876;node2:9876;node3:9876
    storePathRootDir=/app/rocketmq/storeDledger/
    storePathCommitLog=/app/rocketmq/storeDledger/commitlog storePathConsumeQueue=/app/rocketmq/storeDledger/consumequeue
    storePathIndex=/app/rocketmq/storeDledger/index
    storeCheckpoint=/app/rocketmq/storeDledger/checkpoint
    abortFile=/app/rocketmq/storeDledger/abort
    enableDLegerCommitLog=true
    dLegerGroup=RaftNode00
    dLegerPeers=n0-node1:40911;n1-node2:40911;n2-node3:40911
    ## must be unique
    dLegerSelfId=n0
    sendMessageThreadPoolNums=16
    其中和Dledger集群相关的参数:

  • dLedgerGroup: Dledger Raft Group的名字,建议跟brokerName保持⼀致。
  • dLedgerPeers: Dledger Group内各个服务节点的地址及端口信息。同一个Group内的各个节点配置必须要保持一致。
  • dLedgerSelfId: Dledger节点ID,必须属于dLedgerPeers中的一个。同一个Group内的各个节点必须不能重复。 node2和node3就只需要把对应的n0修改为n1n2即可。
  1. 启动服务
    1
    nohup mqbroker -c /app/rocketmq-all-4.9.8-bin-release/conf/dledger/broker.conf &
    启动完成之后我们使用mqadmin查看集群状态:
    1
    2
    3
    4
    5
    [root@localhost dledger]# mqadmin clusterList
    #Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
    RaftCluster RaftNode00 0 192.168.30.5:30911 V4_9_8 0.00(0,0ms) 0.00(0,0ms) 0 478236.00 0.1400
    RaftCluster RaftNode00 1 192.168.30.3:30911 V4_9_8 0.00(0,0ms) 0.00(0,0ms) 0 478236.00 0.1400
    RaftCluster RaftNode00 2 192.168.30.4:30911 V4_9_8 0.00(0,0ms) 0.00(0,0ms) 0 478236.00 0.1500
    此时node3机器上的broker被选成了master(id为0,这个id和配置的dLedgerSelfId没有关联),如果此时关闭node3机器上的broker,就会发现node1和node2中被随机出来一个节点作为了master。如果继续关闭一个服务器上的broker,那么集群会因为可用服务少于半数而无法正常工作。

Dledger集群机制是RocketMQ4.5版本开始⽀持的⼀个重要特性。他其实是由OpenMessage组织带入RocketMQ的一个系列框架。他是一个为高可用、高性能、高可靠的分布式存储系统提供基础支持的组件。他做的事情主要有两个,一是在集群中选举产生master节点。RocketMQ集群需要用这个master节点响应客户端的各种请求。二是在各种复杂的分布式场景下,保证CommitLog日志文件在集群中的强一致性。

Dledger集群背后的核心就是Raft协议。这是一种强大的分布式选举算法,其核⼼是只要集群中超过半数的节点作出的共同决议,就认为是集群最终的共同决议。Kafka也在之后基于Raft协议,自行实现了Kraft集群机制。

Dledger集群由于会接管RocketMQ原生的文件写入,所以,Dledger集群的文件写入速度比RocketMQ的原生写入方式是要慢一点的。这会对RocketMQ的性能产生一些影响。所以,当前版本的Dledger集群在企业中用得并不是太多。5.0版本对Dledger集群抽出了一种Dledger Controller模式,也就是只用Dledger集群的选举功能,而不用他的Commit文件写入功能,这样性能可以得到一定的提升。

总结RocketMQ的运行架构

通过上面简单的实验,我们已经对RocketMQ的运行机制有了一个大概的了解。接下来我们梳理一下RocketMQ中各个组件的作用:

  1. nameServer服务 nameServer不依靠任何其他服务自己就能独立启动。并且不管是broker还是客户端,都需要明确指定nameServer地址。整个RocketMQ集群都要在nameServer的协调下才能正常工作。
  2. broker服务 broker服务是整个集群中设计最为繁琐的部分,最核心的消息存储、传递、查询等功能都由Broker来提供。
  3. client客户端 client客户端包括生产者和消费者。从生产者产生的数据经过nameServer的调度在broker中进行分发给对应的消费者。

理解RocketMQ的消息模型

在学习消息模型之前,我们先往之前的两个节点(2m-2s)的集群发送一些消息。

1
tools.sh org.apache.rocketmq.example.quickstart.Producer
这个脚本会执行Producer测试类往服务端发送1000条消息,消息发送后可以在控制台看到SendResult信息:
1
SendResult [sendStatus=SEND_OK, msgId=7F000001097A72EA2F77708EA51E03E7, offsetMsgId=C0A81E0500002A9F0000000000017608, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=1], queueOffset=124]
其中就有topic,messageQueue等概念,我们打开dashboard,在【主题】菜单中可以看到多了一个名为TopicTest的主题,这就是运行测试类所创建的主题。点击【状态】可以看到主题上的消息分布。

可以看到broker-a和broker-b上各有4个队列,每一个队列分有125条消息,其中offset可以理解和Kafka一样是消息的索引,最大offset就是当前最新接收到消息保存的位置。回头来看之前日志中打印的SendResult的信息。日志中的MessageQueue就代表这一条消息存在哪个队列上了。而queueOffset就表示这条消息记录在MessageQueue的哪个位置。

我们再启动一个消费者来消费消息:

1
tools.sh org.apache.rocketmq.example.quickstart.Consumer

Consumer启动完成后,我们可以在控制台看到很多类似这样的⽇志:

1
ConsumeMessageThread_please_rename_unique_group_name_4_1 Receive New Messages: [MessageExt [brokerName=broker-a3, storeSize=192, queueOffset=47, sysFlag=0, bornTimestamp=1721651595296, bornHost=/192.168.30.3:33990, storeTi21651595170, storeHost=/192.168.30.4:10911, msgId=C0A81E0400002A9F0000000000008F0A, commitLogOffset=36618, body7070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={=0, MAX_OFFSET=125, CONSUME_START_TIME=1721651995917, UNIQ_KEY=7F000001097A72EA2F77708EA020017D, CLUSTER=DefaulTAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 51, 56, 49], transactionI]
这里面也打印出了一些我们刚刚熟悉的brokerName,queueId,queueOffset这些属性。其中queueOffset属性就表示这一条消息在MessageQueue上的存储位点。通过记录每⼀个消息的Offset偏移量,RocketMQ就可以快速的定位到这一条消息具体的存储位置,继续正确读取到消息的内容。

我们可以从dashboard的【消费者】菜单中验证一下消费情况,点击对应消费者后,可以看到从中看到消费了TopicTest主题的消息,并且启动了一个叫做please_rename_unique_group_name_4的消费者组,并且消费都是以组消费的,我们可以自定义一个新的消费组来消费这个主题的信息。这时RocketMQ就会单独记录新消费组的消费进度,而新的消费组也可以消费到TopicTest的全部消息。

对上面的现象进行梳理,我们可以得出模型。生产者和消费者都可以指定一个Topic发送消息或者拉去消息,而Topic是一个逻辑概念,Topic中的消息被分布在后面多个MessageQueue中,这些MessageQueue又会分布在一个或者多个broker上,这一点和Kafka非常相似。

在RocketMQ的这个消息模型当中,最为核心的就是Topic。对于客户端,Topic代表了一类有相同业务规则的消息。对于Broker,Topic则代表了系统中一系列存储消息的资源。所以,RocketMQ对于Topic是需要做严格管理的。如果任由客户端随意创建Topic,那么服务端的资源管理压力就会非常大。默认情况下,Topic都需要由管理员在RocketMQ的服务端手动进行创建,然后才能给客户端使用的。而我们之前在broker.conf中手动添加的autoCreateTopic=true,就是表示可以由客户端自行创建Topic。这种配置方式显然只适用于测试环境,在生产环境不建议打开这个配置项。

对于业务来说,最为重要的就是消息Message了。生产者发送到某一个Topic下的消息,最终会保存在Topic下的某一个MessageQueue中。在消费者来消费消息时,RocketMQ会在Broker端给每个消费者组记录一个消息的消费位点Offset。通过Offset控制每个消费者组的消息处理进度。这样,每一条消息,在一个消费者组当中只被处理一次。

小结

这一章节,主要是快速熟悉RocketMQ产品,并通过操作,理解总结RocketMQ的运行架构以及消息模型。这些抽象的模型和架构和Kafka有很多相似之处,可以与Kafka进行对比学习。当然,RocketMQ相比Kafka拥有更多业务功能,能够满足各类业务场景。