RocketMQ使用指南2——消息模型和Spring整合


目录

  1. RocketMQ简介和集群架构与原理

  2. 理解RocketMQ的消息模型并与Spring集成(本篇)

  3. RocketMQ源码解析

  4. RocketMQ生产环境常见问题总结

主要内容

这一部分主要是了解RocketMQ的消息模型以及使用SpringBoot与服务端交互,主要就是为了学会项目上如何使用RocketMQ。

理解RocketMQ的消息模型

我们从Java的客户端来演示RocketMQ的各项功能,从而进一步理解RocketMQ的消息模型。

基本客户端使用

新建一个Maven项目之后,引入客户端的核心依赖和权限控制相关的依赖。

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.8</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.9.8</version>
</dependency>

一个简单的消息生产者代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class BasicProducer {
public static void main(String[] args) {
// 初始化一个消息生产者
DefaultMQProducer producer = new DefaultMQProducer("basic_producer_group_name");
producer.setNamesrvAddr("192.168.30.3:9876");
try {
// 启动生产者
producer.start();
// 创建消息对象
Message msg = new Message("basic_topic", "basic_tag_A", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
producer.shutdown();
}
}
}

一个简单的消息消费者代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void main(String[] args) {
// 构建⼀个消息消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("basic_consumer_group_name");
// 指定nameserver地址
consumer.setNamesrvAddr("192.168.30.3:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
try {
// 订阅⼀个消息队列
consumer.subscribe("basic_topic", "*");
// 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, consumeConcurrentlyContext) -> {
messageExtList.forEach(messageExt -> {
System.out.println(new String(messageExt.getBody()));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者
consumer.start();
}
catch (Exception e) {
e.printStackTrace();
}
}

RocketMQ的客户端编程模型相对比较固定,其实和其他消息中间件差不多,都有一个固定步骤。

  • 消息生产者的固定步骤
    1. 创建消息生产者Producer,指定生产者组名
    2. 指定NameServer地址
    3. 启动Producer
    4. 创建消息对象,指定主题Topic、Tag和消息体
    5. 发送消息
    6. 关闭生产者Producer,释放资源
  • 消息消费者的固定步骤
    1. 创建消费者Consumer,必须指定消费者组名
    2. 指定NameServer地址
    3. 订阅主题Topic和Tag
    4. 设置回调函数,处理消息
    5. 启动消费者Consumer。消费者会一直挂起持续接收消息。

其中最关键的就是NameServer,RocketMQ的客户端只需要指定NameServer地址,而不需要指定具体的Broker地址。我们在学习Kafka的设计中可以知道Kafka原来是指定的ZooKeeper地址(相当于是NameServer),后面版本是指定的Broker地址,这里双方就有不同点。

指定NameServer的方式有两种,一种是在客户端指定,例如consuemr.setNameSrvAddr("node1:9876"),或者通过读取系统环境变量来指定。

消息确认机制

RocketMQ要支撑阿里的互联网金融场景,那么消息的安全性是必须优先进行保障的。而消息安全在客户端和服务端交互的时候有两个方面的要求,一个是生产者确保消息能够发送到Broker上,另一方面是消费者要能够确保从Broker上获取到消息。

生产者消息确认

针对消息的发送,RocketMQ客户端有三种选项可以给大家选择:

  1. 单向发送:

    单向发送的情况下,消息生产者只管往Broker里发送消息,而完全不管Broker是否成功接收到消息。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public static void main(String[] args) {
    // 初始化一个消息生产者
    DefaultMQProducer producer = new DefaultMQProducer("basic_producer_group_name");
    producer.setNamesrvAddr("192.168.30.3:9876");
    try {
    // 启动生产者
    producer.start();
    // 创建消息对象
    Message msg = new Message("basic_topic", "basic_tag_A", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
    // 发送消息
    producer.sendOneway(msg);
    } catch (Exception e) {
    throw new RuntimeException(e);
    } finally {
    producer.shutdown();
    }
    }

    sendOneway方法没有返回值,如果失败,生产者也无法挽回。这种方式发送的效率最高,可以用于追求发送效率并且允许消息丢失的场景。

  2. 同步发送:

    同步发送的情况下,生产者现场会等待broker的返回,直到broker返回接收结果,生产者才继续工作。代码就是一开始写的最基础的代码,直接使用send方法。

    1
    SendResult sendResult = producer.send(msg);

    send方法会有一个返回值,可以查看这个SendResult类中对于broker处理状态枚举类的属性。

    1
    2
    3
    4
    5
    6
    public enum SendStatus {
    SEND_OK, // 发送成功
    FLUSH_DISK_TIMEOUT, // 刷盘超时
    FLUSH_SLAVE_TIMEOUT, // 同步slave节点超时
    SLAVE_NOT_AVAILABLE, // slave节点无效
    }

    其中很明显只有SEND_OK说明接收成功,如果生产者接收到的状态结果不是这个,就可以对发送失败的消息进行补救,比如重新发送。注意这里返回的状态只是broker接收成功,并不代表消费者能消费。并且状态如果是失败,broker也不是一定没有接收消息,补发可能会造成消息重复的可能,所以重新发送需要带上唯一标识,这样在消费者端才能做幂等判断。

    这种同步发送的方式能够很大程度上保证消息发送的安全性,但是这种同步发送机制的发送效率比较低,send方法需要等待broker处理结果返回后才能继续执行,如果网络情况不佳,耗时会很长。

  3. 异步发送:

    在异步发送机制下,生产者向Broker发送消息是,会同事注册一个回调函数。接下来生产者并不等待broker返回响应而是继续执行,当broker返回结果之后再出发回调函数进行处理。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    // 启动生产者
    producer.start();
    CountDownLatch countDownLatch = new CountDownLatch(10);
    for (int index = 0; index < 10; index++) {
    // 创建消息对象
    Message msg = new Message("basic_topic", "basic_tag_A", ("Hello RocketMQ " + index).getBytes(RemotingHelper.DEFAULT_CHARSET));
    // 发送消息
    int finalIndex = index;
    producer.send(msg, new SendCallback(){
    @Override
    public void onSuccess(SendResult sendResult) {
    // broker处理成功
    countDownLatch.countDown();
    System.out.printf("%-10d OK %s %n", finalIndex, sendResult.getMsgId());
    }

    @Override
    public void onException(Throwable e) {
    // broker处理失败
    countDownLatch.countDown();
    System.out.printf("%-10d Exception %s %n", finalIndex, e);
    e.printStackTrace();
    }
    });
    }
    // 都返回后释放资源
    if (countDownLatch.await(5, java.util.concurrent.TimeUnit.SECONDS)) {
    producer.shutdown();
    }

    需要注意的是,和同步发送机制类似,触发了onException方法并不一定是处理失败,如果只是broker处理太慢也会导致超时,使用设置producer.setSendMsgTimeout(3000);可以自定义超时时间。

    另外可以注意到上面的代码在发送时使用了CountDownLatch,因为只有在接收到所有结果之后才能关闭消费者主线程,否则启动的子线程也会随之关闭,就无法响应broker的返回结果了。

    异步发送的机制是很多消息队列客户端都支持的方式,也是能够适应大部分场景的方式。当然,针对不同的业务场景需要更细致的考虑。

消费者消费确认

对于消费者来说,要保证消费者消费消息,那就需要broker端等待客户端返回结果。

1
2
3
4
5
6
7
8
// 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, consumeConcurrentlyContext) -> {
messageExtList.forEach(messageExt -> {
System.out.println(new String(messageExt.getBody()));
});
// 返回处理结果
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

这里返回的枚举值有两个选项:CONSUME_SUCCESSRECONSUME_LATER。如果返回了CONSUME_SUCCESS那么说明消息消费成功,如果返回了CONSUME_LATER那么broker就会过一点时间再发起重试。

为了兼容重试机制的成功率和性能,RocketMQ设计了一套非常完善的消息重试机制。

  1. Broker不能无限制的向消费失败的消费者推送消息。如果消费者一直没有恢复,broker不可能不限制的推送。所以broker记录每个消息的重试次数,如果重试达到一定的次数这条消息就会被推入死信队列。待到消费者恢复正常,再人工从死信队列中对消息进行处理,可以是重新扔回正常的队列,也可以使用新的消费者消费他们,甚至直接删除。RocketMQ默认的最大重试次数为16次。
  2. 为了让这些重试的消息不会影响正常Topic下的其他正常消息,broker会给每一个消费者组设计对应的重试Topic,因为MessageQueue是一个具有严格FIFO特性的数据结构,如果在原队列上进行重试会导致长时间阻塞,影响性能。在消息需要进行重试时,会先移动到对应的重试Topic中,后续broker只要从这些重试Topic中不断拿出消息并往消费者组不断推送即可。
  3. RocketMQ中设定的消费者组都是订阅主题和消费逻辑相同的服务备份,所以当消息进行重试时,broker只要把消息往消费者组中的任意一个推送即可,并不保证是上一次处理消息的消费者。这里就要注意,消费者组内的不同实例应当是逻辑相同的消费者服务,否则可能在业务上出现处理不一致的情况。
  4. Broker端只能通过消费者返回的状态来确定消息有没有处理成功。至于消费者自己的业务执行是否出现异常,broker是无法感知的。因为消费者在处理业务逻辑时应尽量使用同步实现方式,保证在自己业务处理完成之后再向broker端返回状态。如果是异步处理业务逻辑,那么需要对异常进行捕获和监控。

消费者自行设置消费起点

Broker通过Consumer返回的状态来推进所属消费者组对应的Offset。但是,这里会发现Consumer处理消息但是哪些消息需要处理又是由broker管理的,如果消息出现问题需要查询历史信息,又该怎么办。为此,RocketMQ虽然由broker管理offset,Consumer依然能够指定从某一个offset进行消费。在Consumer的属性中,可以设置一个ConsumeFromWhere属性,用于确定消费点位。

1
2
3
4
5
6
7
8
public enum ConsumeFromWhere {
CONSUME_FROM_LAST_OFFSET, // 从上一次消费到的地方开始消费
CONSUME_FROM_FIRST_OFFSET, // 从队列的第一条消息开始重新消费
CONSUME_FROM_TIMESTAMP; // 从某一个时间戳开始消费

private ConsumeFromWhere() {
}
}

当然,如果设置了ConsumeFromWhere.CONSUME_FROM_TIMESTAMP那么需要传入一个时间戳:

1
defaultConsumer.setConsumeTimestamp("20191024171201");

广播消息

我们在RabbitMQ中学习过fanout交换机用于广播消息,它的核心是把消息发送的所有绑定的队列上。在Kafka中,我们需要使用多个消费者组去消费同一个Topic来实现广播。而在RocketMQ中,我们只需要设置消费者的消费模式为consumer.setMessageModel(MessageModel.BROADCASTING);,这样一个消息就会被多个消费者同时消费,不在关心消费者组的概念。

默认的集群模式(MessageModel.CLUSTERING)下,broker端维护每一个ConsumerGroup的Offset,这个offset可以保证消息在一个ComsumerGroup内只被消费一次。而广播模式下,broker将offset的管理交给消费者端,broker只管向所有消费者推送消息,不再负责维护消费进度。

注意,由于broker不再维护消费进度,所以消费者如果消费失败了,重试就只能由消费者自行处理。如果消费者重启,可以按照自行保存的上一次消费进度开始,处理后面没有消费过消息。不过一旦offset文件丢失,那么消费者就无法得知该从哪里开始消费了。

消费者的offset是保存在/home/{user}/.rocketmq_offsets 中,对应的路径可以参考org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore类中的写法:

1
2
3
4
this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +
this.mQClientFactory.getClientId() + File.separator +
this.groupName + File.separator +
"offsets.json";

客户端在运行时会不断尝试本地offset文件的写入,但是如果写入失败不会进行任何补救。

顺序消息机制

队列,很显然是一个FIFO的数据结构。但是消息队列对于顺序消息却不是默认实现的,比如Kafka就很难实现消息的顺序机制。RocketMQ提供了一种方式来实现顺序消费,见示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 启动生产者
producer.start();
for (int businessId = 0; businessId < 10; businessId++) {
// 模拟10次业务过程
for (int j = 0; j < 5; j++) {
// 模拟1次业务的5个流程
String progressId = businessId + "_" + j;
// 创建消息对象
Message msg = new Message("consume_orderly_topic", "tag_business_" + businessId, "key_" + progressId, ("Hello RocketMQ " + progressId).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息
SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> {
// 这里的arg就是send方法的第三个参数businessId
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}, businessId);
System.out.printf("%s%n", sendResult);
}
}

这里可以看到我们使用了org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message, org.apache.rocketmq.client.producer.MessageQueueSelector, java.lang.Object)方法来发送消息,这里第二个参数可以定义一个选择发送队列的选择器,选择器的实现中,我们根据业务的标识进行hash,保证同一个业务的多个流程都是发往broker中同一个队列的,这样可以保证这些消息发送到broker存储时是顺序的。

下面查看消费者客户端的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 订阅⼀个消息队列
consumer.subscribe("consume_orderly_topic", "*");
// 注册消息监听器,这里监听器使用MessageListenerOrderly
consumer.registerMessageListener((MessageListenerOrderly) (messageExtList, consumeOrderlyContext) -> {
consumeOrderlyContext.setAutoCommit(true);
messageExtList.forEach(messageExt -> {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + messageExt.getQueueId() + ", content:" + new String(messageExt.getBody()));
});
return ConsumeOrderlyStatus.SUCCESS;
});
// 启动消费者
consumer.start();

这里可以看到我们使用了org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#registerMessageListener(org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly)方法来消费消息。和之前默认的监听不同,使用MessageListenerOrderly监听器不会启动并发的线程去拉取消息,而是使用一个线程去依次消费。当然在使用一个线程去消费的时候,一条消息在处理过程中时监听是阻塞的,需要处理一条消息之后才会拉取下一条消息,所以在这种场景下一般不会使用异步处理消息,也不会使用重试机制。

注意点:

  1. 这边的有序保证的是局部有序,大部分业务场景下也是满足局部有序即可。如果要保持全局有序那么只能使用一个队列,效率会大大降低。
  2. 在对消息进行业务分组时,需要尽量将消息分散到不同的队列上来提高消费效率。
  3. 消费者端只能用同步方式处理,如果使用异步处理那么依然有可能导致顺序不一致。
  4. 消费者不能无限重试。如果消息处理失败,后面的消息都会阻塞,但消息失败次数达到RocketMQ最大重试次数跳过此消息,处理后面消息,就会导致消息乱序。
  5. 因为上一点,若消费者端如果确实处理逻辑出现问题,不建议抛出异常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作为替代。

延迟消息

延迟消息是指消息发送到消息队列之后不是马上投递消息,而是等待一段时间之后才投递到Consumer进行消费。RabbitMQ中可以通过消息的过期时间来利用死信队列的机制实现延迟消息,而Kafka则不太好实现这个功能。RocketMQ原生就可以支持。

代码中需要做的修改就只有一个,对消息设置一个延迟级别。

1
msg.setDelayTimeLevel(2);

这里的延迟级别是和RocketMQ里设置的级别对应的,见MessageStoreConfig,如下:

1
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

现在RocketMQ并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18。18个等级无法调整,但是可以调整每个级别的延迟时间,总体来说还是不太灵活。在5.x的版本中,已经支持已经可以按秒的精度进行定时发送。

批量消息

生产者要发送多条消息时,可以将多条消息合并发送,减少网络开销,这算是一种常见的优化方式。

1
2
3
4
5
6
7
8
9
// 创建消息对象
List<Message> msgList = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Message msg = new Message("batch_topic", "batch_tag_A",
("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
msgList.add(msg);
}
// 发送消息
SendResult sendResult = producer.send(msgList);

批量消息的使用有一定的限制,同一批消息的Topic必须相同,并且不支持延迟消息。此外,这一批消息的总大小不应超过4MB。

过滤消息

同一个Topic下的所有消息也不一定是都是某一个消费者想要的,如果一个消费者想要特定属性的消息,那么就需要在发送阶段使用特殊的标签来标识。

  • 简单过滤:

    生产者在发送消息时,使用Tag属性。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // 启动生产者
    producer.start();
    // 指定Tag属性
    String[] tags = {"basic_tag_A", "basic_tag_B", "basic_tag_C", "basic_tag_D"};
    // 创建消息对象
    for (String tag : tags) {
    Message msg = new Message("basic_topic", tag, "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
    // 发送消息
    SendResult sendResult = producer.send(msg);
    System.out.printf("%s%n", sendResult);
    }

    消费者在消费时根据Tag订阅消息:

    1
    2
    // 订阅⼀个消息队列
    consumer.subscribe("basic_topic", "basic_tag_A");

    这样,Consumer就只会处理Tag为basic_tag_A的消息。

  • SQL过滤

    通过Tag属性只能进行简单的消息匹配。如果需要使用数字比较、模糊匹配等,就需要使用SQL过滤方式。SQL过滤方式可以通过Tag属性以及用户自定义的属性一起,使用标准的SQL方式进行消息过滤。

    生产者在发送时除了Tag属性还要添加自定义属性:

    1
    2
    3
    4
    // 创建消息对象
    Message msg = new Message("basic_topic", "basic_tag_A", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
    // 设置自定义属性
    msg.putUserProperty("ATTRIBUTE", "1");

    消费者在消费时创建一个SQL过滤器订阅消息:

    1
    2
    3
    // 订阅⼀个消息队列,使用sql过滤器过滤
    consumer.subscribe("basic_topic", MessageSelector.bySql(
    "(TAGS in ('basic_tag_A', 'basic_tag_B') AND (ATTRIBUTE is not null and ATTRIBUTE between 0 and 3))"));

    这个SQL语句的处理使用的是开源的ANLTR框架,这个框架也被用于ShardingSphere和Flink。

    注意点:

    1. 使用Tag过滤时,如果希望匹配多个Tag,可以使用两个竖线(||)连接多个Tag。另外,可以使用星号(*)匹配所有。
    2. 使用SQL过滤时,SQL语句按照SQL92标准,支持常见的SQL操作。如数值比较(>,>=,<,<=,BETWEEN,=)、字符比较(=,<>,IN)、空值(IS NULL, IS NOT NULL)、逻辑运算(AND, OR, NOT)。
    3. 消息过滤可以选择在消费者端做也可以选择在生产者端做。消费者端做就把消息标记不成功直接返回,这样可以减少服务端压力。我们使用服务端过滤可以提前完成过滤判断,减少不必要的IO,在RocketMQ服务端良好设计的前提下,应该尽量使用服务端过滤。
    4. Consumer端不关心的数据不代表丢弃,应该在同一个消费者组内有其他消费者来消费掉这消息。

事务消息

事务消息是用来保证上下游的数据一致性的,在电商等领域的场景非常需要这个功能来将子系统的数据变更串联到一起。

考虑到事务的安全性,那么要保证相关联的n个业务是同时成功或者失败的。假设一个流程的过程为主线程提交到RocketMQ(A1),3个消费者进行消费(A1.1,A1.2,A1.3),主线程执行A2。如要要把3个消费者服务和主线程用一个分布式事务来控制会很麻烦,但是如果使用RocketMQ串联起来就会比较简化。根据RocketMQ的消息确认以及重试机制,我们可以认为A1到A1.1、A1.2、A1.3这中间是保证最终数据一致性的。而此时只要再解决A1和A2的事务问题就可以了。由此,RocketMQ提出了两阶段提交的思路来实现事务一致性。

实现思路是这样的:

  1. 生产者把消息发送到RocketMQ服务端。
  2. 服务端将消息持久化之后向生产者确认消息已经发送成功,此时消息在服务端被标记为暂不能投递,也称半事务消息。
  3. 生产者执行本地逻辑。
  4. 生产者根据本地逻辑的执行结果向服务端提交二次确认结果(Commit或者Rollback),服务端接收并进行处理。
    • Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • Rollback:服务端将事务回滚,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

代码上,我们通过实现RocketMQ提供的接口TransactionListener来实现提交半事务后的业务执行和回查逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);

private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

/**
* 提交半事务消息后执行本地逻辑
* @param message 消息实体
* @param o org.apache.rocketmq.client.producer.TransactionMQProducer#sendMessageInTransaction(org.apache
* .rocketmq.common.message.Message, java.lang.Object)中传入的自定义参数
* @return 事务状态
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(message.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}

/**
* 服务端发起回查时客户端执行的代码
* @param messageExt 回查的消息
* @return 事务状态
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
Integer status = localTrans.get(messageExt.getTransactionId());
if (null != status) {
return switch (status) {
case 0 -> LocalTransactionState.UNKNOW;
case 1 -> LocalTransactionState.COMMIT_MESSAGE;
case 2 -> LocalTransactionState.ROLLBACK_MESSAGE;
default -> LocalTransactionState.COMMIT_MESSAGE;
};
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}

生产者只需要把这个监听设置给对应RocketMQ提供的TransactionMQProducer类即可:

1
2
3
4
5
6
7
8
9
10
11
12
// 初始化一个消息生产者
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group_name");
// 创建事务监听器
TransactionListener transactionListener = new TransactionListenerImpl();
// 创建执行监听器中代码的线程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
});
producer.setExecutorService(executorService);
// 后面发送消息和普通的一样

注意点:

  1. 半消息是对消费者不可见的消息,实际上,RocketMQ是将消息转到了一个系统Topic,RMQ_SYS_TRANS_HALT_TOPIC。然后二次提交的时候根据offset拿到消息执行最终的投递。参考org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#commitMessage
  2. 事务消息中,本地事务回查次数通过参数transactionCheckMax设定,默认15次。本地事务回查的间隔通过参数transactionCheckInterval设定,默认60秒。超过回查次数后,消息将会被丢弃。

ACL权限控制

RocketMQ提供了针对队列、用户等不同维度的非常全面的权限管理机制。通常来说RocketMQ作为一个内部服务是不需要进行权限控制的,但是如果消息队列是需要跨部门甚至跨公司合作使用的,那么权限控制的重要性就体现出来了。

  1. 在控制平台可以针对Topic配置权限,比如设置这个Topic禁止读写、可读不可写、可读可写。

  2. 通过Broker端的配置文件可以更加详细的进行权限控制。打开broker.confaclEnable配置,然后通过plain_acl.yml文件进行配置,并且这个文件是热加载的,修改权限不需要重启服务。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    accounts:
    - accessKey: RocketMQ
    secretKey: 12345678
    whiteRemoteAddress: 192.168.0.*
    admin: false
    defaultTopicPerm: DENY
    defaultGroupPerm: SUB
    topicPerms:
    - topicA=DENY
    - topicB=PUB|SUB
    - topicC=SUB
    groupPerms:
    # the group should convert to retry topic
    - groupA=DENY
    - groupB=SUB
    - groupC=SUB

    - accessKey: rocketmq2
    secretKey: 12345678
    whiteRemoteAddress: 192.168.1.*
    # if it is admin, it could access all resources
    admin: true

    配置了权限信息之后,客户端再使用就需要通过accessKey和secretKey来提交身份信息了。引入对应的依赖包,并在声明生产者的时候传入RPCHook。

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-acl</artifactId>
    <version>4.9.8</version>
    </dependency>
    1
    2
    3
    4
    5
    6
    // 声明身份信息
    String ACL_ACCESS_KEY = "RocketMQ";
    String ACL_SECRET_KEY= "12345678";
    RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
    // 初始化一个消息生产者
    DefaultMQProducer producer = new DefaultMQProducer("acl_producer_group_name", rpcHook);

SpringBoot整合RocketMQ

快速搭建

首先,pom文件引入相关包:

1
2
3
4
5
6
<!-- 添加RocketMQ 依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>

编写启动类:

1
2
3
4
5
6
@SpringBootApplication
public class RocketMQSpringbootClientApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQSpringbootClientApplication.class, args);
}
}

配置文件:

1
2
3
4
rocketmq.name-server=192.168.30.3:9876
rocketmq.producer.group=springBootGroup
#rocketmq.consumer.topic=
rocketmq.consumer.group=testGroup

这里注解可以不配,但是就需要在消费者类的注解上配。

生产者很简单,直接注入使用RocketMQTemplate类:

1
2
3
4
5
6
7
8
9
@Component
public class BasicProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;

public void sendMessage(String topic, String msg) {
this.rocketMQTemplate.convertAndSend(topic, msg);
}
}

消费者实现提供的RocketMQListener接口:

1
2
3
4
5
6
7
8
9
10
@Component
@RocketMQMessageListener(consumerGroup = "BasicGroup", topic =
"BasicTopic", consumeMode = ConsumeMode.CONCURRENTLY, messageModel =
MessageModel.BROADCASTING)
public class BasicConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Received message : " + message);
}
}

处理不同消息类型

简单的使用

可以参考官方使用手册

注意点

一个RocketMQTemplate实例只能包含一个生产者,也就只能往一个Topic下发送消息。如果需要往另外一个Topic发送消息,需要使用@ExtRocketMQTemplateConfiguration注解并配置地址配合使用:

1
2
3
@ExtRocketMQTemplateConfiguration(nameServer = "${rocketmqB.name-server}",group = "${rocketmqB.producer.group}")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
1
2
3
4
5
6
7
8
9
@Component
public class ExtProducer {
@Resource(name = "extRocketMQTemplate")
private RocketMQTemplate extRocketMQTemplate;

public void sendMessage(String topic, String msg) {
this.extRocketMQTemplate.convertAndSend(topic, msg);
}
}

实现原理

Push模式

Push模式就是上节实例中使用@RocketMQMessageListener注解声明的消费者。在Springboot的启动中,会对配置类进行加载,其中ListenerContainerConfiguration类负责了对这些消费者的初始化。

由于实现了SmartInitializingSingleton接口,在Spring容器完成非懒加载实例的加载后会调用afterSingletonsInstantiated()方法。我可以看到这个方法中会查找所有带有@RocketMQMessageListener注解的类并加入一个内部的map,并遍历调用registerContainer方法:

1
2
3
4
5
6
7
8
@Override
public void afterSingletonsInstantiated() {
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
.entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

beans.forEach(this::registerContainer);
}

registerContainer方法比较重要,我们看一下大致的流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void registerContainer(String beanName, Object bean) {
// ...
// 获得类上的注解
RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
// ...
// 校验注解配置
validate(annotation);
// 将每个有注解的类转换成一个container并注册到上下文中
String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
counter.incrementAndGet());
GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
() -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class);
// 启动容器(启动消费者)
if (!container.isRunning()) {
try {
container.start();
} catch (Exception e) {
log.error("Started container failed. {}", container, e);
throw new RuntimeException(e);
}
}

log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}

这个方法中主要是创建了一个DefaultRocketMQListenerContainer容器,这个容器中包含了我们自己写的消费者类,所以可以推测和RocketMQ的API交互的代码会在这个类里面去实现。接下来我们就重点看DefaultRocketMQListenerContainer的代码。DefaultRocketMQListenerContainer实现了InitializingBean接口,所以他的afterPropertiesSet()方法是重点。可以看到在方法中调用了initRocketMQPushConsumer()方法,看方法名就可以猜出来这是初始化消费者的代码。

在这个方法中,终于看到了RocketMQ原生的DefaultMQPushConsumer消费者,并且可以看到一些我们之前在基本客户端使用时用到的类似代码,下面我们大致来看下重点代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private void initRocketMQPushConsumer() throws MQClientException {
// ...
// 判断是否有权限控制并创建消费者类
if (Objects.nonNull(rpcHook)) {
consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(), enableMsgTrace, this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
consumer.setVipChannelEnabled(false);
} else {
log.debug("Access-key or secret-key not configure in " + this + ".");
consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace, this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
}
consumer.setNamespace(namespace);
// 定制instanceName
consumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
// ... nameSever以及消费者属性的设置
// 判断消费类型(广播、集群)
switch (messageModel) {
case BROADCASTING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
break;
case CLUSTERING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
break;
default:
throw new IllegalArgumentException("Property 'messageModel' was wrong.");
}
// 过滤方式
switch (selectorType) {
case TAG:
consumer.subscribe(topic, selectorExpression);
break;
case SQL92:
consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
break;
default:
throw new IllegalArgumentException("Property 'selectorType' was wrong.");
}
// 指定监听类(是否顺序消费)
switch (consumeMode) {
case ORDERLY:
consumer.setMessageListener(new DefaultMessageListenerOrderly());
break;
case CONCURRENTLY:
consumer.setMessageListener(new DefaultMessageListenerConcurrently());
break;
default:
throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
}

// ...
}

Pull模式

Pull模式的实现是通过在RocketMQTemplate实例中注入DefaultLitePullConsumer实例来实现的。在注入了这个消费者实例之后,就可以使用template的receive()方法来调用消费者实例的poll()方法去主动Pull消息了。

我们从RocketMQTemplate类的receive()方法开始一步步找初始化的代码,可以看到receive()方法调用了this.consumer.poll(timeout);,其中变量consumer就是注入的消费者实例了,我们查找这个consumer的使用,找到setConsumer()方法的调用,可以看到在RocketMQAutoConfiguration这个配置类中对RocketMQTemplate实例的加载中会获取消费者实例并赋值给template实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Bean(destroyMethod = "destroy")
@Conditional(ProducerOrConsumerPropertyCondition.class)
@ConditionalOnMissingBean(name = ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
public RocketMQTemplate rocketMQTemplate(RocketMQMessageConverter rocketMQMessageConverter) {
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
if (applicationContext.containsBean(PRODUCER_BEAN_NAME)) {
rocketMQTemplate.setProducer((DefaultMQProducer) applicationContext.getBean(PRODUCER_BEAN_NAME));
}
if (applicationContext.containsBean(CONSUMER_BEAN_NAME)) {
rocketMQTemplate.setConsumer((DefaultLitePullConsumer) applicationContext.getBean(CONSUMER_BEAN_NAME));
}
rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
return rocketMQTemplate;
}

其中,CONSUMER_BEAN_NAMEdefaultLitePullConsumer,所以我们找一下这个类的加载方法。依然是在RocketMQAutoConfiguration这个配置类中有defaultLitePullConsumer的初始化方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Bean(CONSUMER_BEAN_NAME)
@ConditionalOnMissingBean(DefaultLitePullConsumer.class)
@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "consumer.group", "consumer.topic"})
public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties)
throws MQClientException {
// ... 读取配置项
// 创建实例
DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel, groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS);
litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
litePullConsumer.setNamespace(consumerConfig.getNamespace());
return litePullConsumer;
}

其中RocketMQUtil.createDefaultLitePullConsumer()方法就是创建了一个DefaultLitePullConsumer实例,相当于原生API中的拉模式客户端。

最佳实践

合理分配Topic、Tag

一个应用尽可能使用一个Topic,而消息子类型可以使用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤。

虽然RocketMQ相较于Kafka在Topic过多的情况下性能已经有大幅优化,不会对消息转发的性能有影响,但是Topic过多还是会增加服务端对元数据维护的性能损耗,所以还是要尽量对Topic进行合理的分配。

对消息过滤时尽量使用Tag过滤,不要用复杂的SQL过滤,因为Tag是会写到索引里面的,这个在下一篇例会讲到。虽然使用SQL过滤能够减少更过的网络开销,但是毕竟会增加服务端的压力,所以消息过滤的逻辑还是越简单越好。

使用Key加快消息索引

分配好Topic和Tag之后,就要优化Key属性了,因为Key也可以参与消息过滤。通常每个消息要分配一个在业务层面的唯一标识码,设置到Key属性里。一方面可以配合Tag进行更精确的消息过滤,另一方面是RocketMQ服务端会对每个消息创建一个哈希索引,应用通过topic、key进行查询时,为了增加查询的效率,减小哈希冲突,客户端需要尽量保证key的唯一性。

关注错误消息重试

如果消费者端的消息处理失败了,服务端会将消息进行重新投送。实际上,服务端会给每一个消费者组创建一个重试队列,重试的消息会进入一个“%RETRY%” + ConsumerGroup的队列中。

平时需要多关注重试队列,如果这个队列中出现了大量的消息,说明消费者的运行出现了问题。

每条消息默认最多重试16次,重试的间隔根据延迟消息级别的后16个级别判断。

如果重试16次后仍然失败,那么消息会转到死信队列。重试次数是可以设置的,使用consumer.setMaxReconsumeTimes(20)设置重试次数为20次。当超过16次之后,间隔固定为2小时。要注意的是,设置这个消费者的重试次数会对GroupId相同的所有Consumer同时生效,也就是整个GroupId相同的消费者集群会以设置了这个参数的最后一个启动的消费者为准。

手动处理死信队列

消息进入死信队列说明在消费处理中这条消息出现了比较严重的错误,需要手动处理或者丢弃。

死信队列的名称是%DLQ%+ConsumerGroup,说明每一个死信队列对应一个ConsumerGroup,而不是对应某一个消费者实例。如果一个ConsumerGroup没有产生死信队列,RocketMQ也不会为其创建死信队列。而且一个消费者组内不管什么Topic的消息都在一个死信队列。

死信队列的消息不会再被消费者正常消费,默认也会因消息超时而被删除,默认为3天,对应broker.conffileReservedTime属性。

默认死信队列是禁读的,需要修改设置perm权限才能被消费。

幂等性控制

我们在不同的消息队列的讨论中都会提到幂等性控制:

  • at most once:使用异步发送或者sendOneWay可以保证
  • at least once:使用同步发送、事务消息等可以保证
  • exactly once:需要业务控制

在发送、消费、服务重启(扩容、缩容)等场景下可能会出现消息重复的情况。在RocketMQ商业版确实提供了exactly once的相关API,而我们自己在使用的时候更多的是要在业务上保证消息不重复消费。比如使用messageID进行幂等性判断,但是messageID不是全局唯一的,如果能在业务上确认一个全局唯一的ID,作为Message的Key来传递是比较靠谱的。