RabbitMQ使用指南1,2——简介和安装、原生Java客户端的使用


写在前面

之前自己学习了一下RMQ的使用,一直没有时间整理,现在抽空整理一下笔记。

主要内容

1.RabbitMQ简介和安装 这块主要是一个RabbitMQ所包含的内部元素的介绍(交换机、队列等等)。

2.原生Java客户端的使用 重要,主要理解是这一块。弄懂了原生的使用,基本上遇到问题也都能解决了。

2020-9-24 注:这边考虑了一下,模块1和模块2结合在一起比较好,所以总的就分两部分吧。

3.与Spring集成 其实就是把上一部分进行了封装,理解的话需要Spring相关知识,比如Bean的初始化,自动注入。使用比较无脑,cv就行。


RabbitMQ简介和安装

简介

正文开始,首先我们来了解一下什么MQ,为什么要用MQ。

什么是MQ?中文名消息中间件。一般来说是用于分布式系统中多个子系统之间高效的传递消息,主要关注的就是消息的接收与分发,用异步的方式完成各个子系统之间信息交流。基本上一般的MQ都能做到高效、可靠。

划重点:消息中间件不生产消息,只是消息的搬运工。

为什么要用MQ?之前所有业务都集成在一个系统里,渐渐地业务量提升、系统功能多样化,单个应用可能会导致更新困难、一旦某个功能挂了可能整个系统用不了,所以把一个系统拆分成多个子系统。那么这个时候就要解决子系统之间的通信问题。在信息量比较小的情况下,使用接口调用RPC是比较常见的方法,但是在信息量比较大的情况或者业务比较复杂涉及多个子系统的时候,就需要中间件来帮忙了。对于这种复杂的分布式系统,消息中间件可以降低子系统之间的耦合度(各个模块之间没有直接的调用,而是都通过MQ通信)、赋予子系统异步处理能力(预警系统要发短信,往MQ里扔一条消息就行了,可以接下去走其他逻辑,短信服务系统自己会从MQ中获取消息并发送短信)、赋予系统缓冲能力(突然要发几千几万条短信,调用方如果突然发几千几万个请求去短信服务让其发短信肯定会造成性能问题,可能直接就挂了。MQ就可以接受并把这些消息储存起来,就像一个蓄水池,做到“削峰平谷”)、赋予系统良好的扩展性(有新业务需要加入,比如本来订单系统需要给库存系统发送消息,现在又要发短信,只要多给MQ发一条给短信服务的消息,对原有的功能没有影响)、赋予系统良好的伸缩性(这个好理解,需要发短信的系统太多了,每分钟成千上万条,那当然就是加短信服务应用,处理不完就增加服务器好了)。

划重点:低耦合、异步通信、缓冲、伸缩性、扩展性。

应用场景

先略过吧。不是特别重要,后面的章节比较重要。

常用中间件比较

常用的4款MQ有:ActiveMQ、RabbitMQ、RocketMQ、Kafka。

ActiveMQ:性能最差,在业务量一定的情况下可以使用。 RabbitMQ:正统地结合AMQP协议,性能较好,比较成熟。但是底层是Erlang。 RocketMQ:阿里内部应用规模比较大,性能好,但是文档少,更新比较快(意思就是不太稳定、坑多,手动狗头)。要是Java技术强,公司有架构研发能力可以考虑。 Kafka:性能在百万级,大数据等领域可以使用。但是需要ZK支持。

这边的话。。ActiveMQ我就忽略了没看,RocketMQ大概了解了一下,Kafka也是没看,准备学了ZK再来看的,然后ZK的源码直接看晕了gg。所以只能讲讲RMQ了。

RabbitMQ安装

Linux安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
su - root
##下载erlang并安装
wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
yum install epel-release
yum install erlang
##下载rabbitmq并安装
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm
yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm
## 注意:不同的linux版本对应支持的rabbitmq版本不一样,不同rabbitmq对应的erlang版本也不一样。

##启动rmq
service rabbitmq-server start
service rabbitmq-server status
#安装管理控制台(就是15672端口那个)
rabbitmq-plugins enable rabbitmq_management
#重启RabbitMQ
service rabbitmq-server stop
service rabbitmq-server start

#开启相对应的端口
firewall-cmd --permanent --add-port=15672/tcp
firewall-cmd --permanent --add-port=5672/tcp

#查看已有虚拟主机并增加名为test的虚拟主机(后面会介绍什么是虚拟主机,可以理解为多个工作区域)
rabbitmqctl list_vhosts
rabbitmqctl add_vhost test
rabbitmqctl list_vhosts

#增加名为zm的用户并配置administrator角色,增加相应的权限
rabbitmqctl add_user zm 123456
#下面的test为虚拟主机
rabbitmqctl set_permissions -p test zm '.*' '.*' '.*'
rabbitmqctl set_user_tags zm administrator
#注意:系统默认的guest用户是不能进行远程登录的,除非另行配置相关参数。

Windows安装

下载Erlang并安装。安装程序名字应该是otp_win64_xxx.exe。 下载RabbitMQ-Server并安装。安装完之后在安装目录的rabbitmq_server-3.6.6\sbin文件夹下会有对应的.bat文件。

AMQP在RabbitMQ中的体现

客户端与RabbitMQ的通信

连接(Connections)

无论是生产者还是消费者,与RabbitMQ通信的时候都需要创建连接。只有创建完一条TCP连接,并且相互确认符合AMQP协议之后,才能在这个连接的基础上再进行其他操作,比如开始创建信道。 可以在RabbitMQ的管理界面中点击Connections查看所有建立的连接。 管理页面中的连接

信道(Channels)

信道是在连接的基础上创建的一个个通信的“虚拟连接”。因为连接资源是宝贵的,不可能每个线程都创建一个连接,所以RabbitMQ使用了“多路复用”的技术,对于一些多线程执行多个任务的应用来说,在一个TCP连接中创建多个信道,每个信道对应一个线程。 可以在RabbitMQ的管理界面中点击Connections查看所有建立的连接(上图),图中每个连接里面有个Channels列就是创建的信道数。 可以在RabbitMQ的管理界面中点击Channels查看所有建立的信道。

管理页面中的信道

RabbitMQ中的几个要素

生产者(Producer): 消息的创建者,发送消息到RabbitMQ。

消费者(Consumer): 连接到RabbitMQ,订阅某个队列,消费消息。

消息(Message):包含传输的数据信息和标签。标签用来确定消息发送给哪个队列。获取到消息的消费者只会得到传输的数据信息。

交换机(Exchanges):所有发送的消息都会确定一个它所到达的交换机,然后当消息被发送到RabbitMQ就会先到这个交换机。就像要从国内发一封信到国外,这封信是去浦东机场坐飞机去国外还是去虹桥机场坐飞机去国外,这里的机场就是交换机。你可以在信的封面上写好,从xxx机场飞往国外。

路由键(Routing Key)、队列(Queue)、绑定(Binding):队列通过路由键绑定到交换机上。生产者把消息发送到交换机之后,交换机会根据消息上标明的路由键将消息发送到对应的队列中,由订阅这个队列的消费者进行接收。 可以在管理页面的Exchanges标签和Queues标签中查看交换机、队列及其绑定情况、监控信息等。 可以看着下面的图理解一下 RMQ大体流程结构

虚拟主机(VHost):虚拟主机可以理解为在一个整个mq的服务器中的一个内部的mq服务器,拥有自己的交换机、队列。其中最重要的功能就是可以根据虚拟主机来设置权限。 虚拟主机提供了将一个mq服务器划分成多个区域的功能,对于不同的客户端可以进行区分。在客户端连接mq服务器的时候必须指定虚拟主机,默认为:“/”,默认的用户和密码为guest。 默认情况下,guest用户是不能通过非本机的管理页面登陆,所以在使用的时候尽量分配一个新的用户给客户端使用。在新建用户的时候必须要指派至少一个虚拟主机,并且该用户只能访问指派的虚拟主机内的队列、交换机。

交换机类型

交换机一共有四种:direct,fanout,topic,headers,这里就讲主要使用的前三种交换机。

Direct交换机

Direct交换机是RabbitMQ默认的交换机,需要的是队列绑定的键和消息的路由键完全一致时,消息会被投递到对应队列。

Fanout交换机

不管队列绑定了什么路由键,只要有消息发送到Fanout交换机,所有绑定在该交换机上面的队列都会收到一份消息。

Topic交换机

Topic交换机是通过使用*/#通配符进行匹配处理。当我们把队列通过绑定键绑定到Topic交换机上的时候,可以通过.来分隔键,*/#来代表通配符。*匹配一个,#匹配一到多个。下面举例说明。

情况1:首先我们有三个队列,队列A通过绑定键 *.*.toyota绑定到交换机,队列B通过绑定键*.black.*绑定到交换机,队列C通过绑定键 fast.*.*绑定到交换机,对应就是可以理解为队列A关注所有丰田的汽车,队列B关注所有黑色的汽车,队列C关注所有跑得快的汽车。 使用路由键fast.black.toyota将消息发送到交换机,那么3个队列都会接收到消息,如果使用fast.white.toyota将消息发送到路由键,那么只有队列A/C接收到消息。 情况2:如果我们有两个队列,队列A通过绑定键 *.*.toyota绑定到交换机,队列B通过绑定键*.black.*和绑定键 fast.*.*绑定到交换机。 使用路由键fast.black.toyota将消息发送到交换机,虽然路由键匹配了队列B的两种绑定键,但是只会给B投递一次消息。 情况3:当一个队列以”#”作为绑定键时,它将接收所有消息,而不管路由键如何,类似于fanout型交换机。 情况4:队列A通过绑定键 *.toyota绑定到交换机,队列B通过绑定键#.toyota绑定到交换机。 使用路由键black.toyota将消息发送到交换机,队列A/B都能收到消息。 使用路由键fast.black.toyota将消息发送到交换机,只有队列B都能收到消息。

Headers交换机

头交换机类似于主题交换机,但是主题交换机是根据路由键来判断的,头交换机是根据消息属性来进行消息的分发。下面举例说明。

队列A:绑定交换机参数是:filetype=png,catagory=animal,x-match=all; 队列B:绑定交换机参数是:filetype=png,catagory=tree,x-match=any; 队列C:绑定交换机参数是:filetype=jpg,catagory=animal,x-match=all。

此时发送消息如下: 消息1发送交换机的头参数是:filetype=png,catagory=animal则消息传送到队列A; 消息2发送交换机的头参数是:filetype=png则消息传送到队列A和队列B; 消息3发送交换机的头参数是:filetype=jpg,catagory=tree则消息没有匹配队列,此消息会被丢弃。 值得注意的就是x-match这个特殊参数,他有两个取值: all:默认,表示header中所有的属性必须完全一致; any:只要有一个属性匹配成功就可以。

队列与交换机的多重绑定

队列在绑定到交换机上的时候是允许绑定多个路由键的,就想上面说的情况2一样,这就是多重绑定。就是说单个绑定的队列只能收到指定的路由键的消息,但是多重绑定的队列可以有多次匹配的机会,但是就算一个队列的多个绑定键都匹配了也就只会有一条消息。

速度与可靠性的权衡

RabbitMQ中对于交换机、队列、消息有很多可以配置的属性,每种机制对mq的性能都有一定的影响。一般来说,速度越快的可靠性越低,可靠性好的性能差,具体怎么使用就需要根据业务需求来,没有最好只有最合适。

发送消息

无保障

最基础的通过basicPublish发布消息,配置了正确的交换机和路由键,消息就会被接收并发送到合适的队列。但是一旦网络有问题,消息无法路由或者RabbitMQ自身出现错误,这种方式就会有风险。

失败通知

在生产者发送消息的时候设置mandatory标识,这样如果消息不可路由,会通知生产者失败。

1
2
3
4
5
6
7
8
9
10
11
12
//失败通知 回调
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replycode, String replyText, String exchange, String routeKey, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
String message = new String(bytes);
System.out.println("返回的message:" + message);
System.out.println("返回的replycode:" + replycode);
System.out.println("返回的replyText:" + replyText);
System.out.println("返回的exchange:" + exchange);
System.out.println("返回的routeKey:" + routeKey);
}
});

事务

事务的实现是通过对信道(Channel)的设置。在发送消息之前先对信道进行设置channel.txSelect()开启事务。然后发送消息之后使用channel.txComment()提交事务。 使用channel.txRollback()回滚事务。 注:事务对中间件的性能影响相当大,尽量不要使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 开启事务
channel.txSelect();
try {
for(int i=0; i<3; i++){
// 发送的消息
String message = "Hello_" + ( i + 1 ) + "_");
channel.basicPublish(EXCHANGE_NAME, routekey, true, null, message.getBytes());
System.out.println("发送消息" + message + "'");
Thread.sleep(200);
}
// 事务提交
channel.txCommit();
} catch (IOException e) {
e.printStackTrace();
// 事务回滚
channel.txRollback();
} catch (InterruptedException e) {
e.printStackTrace();
}

发送方确认

代替事务使用的就是发送方确认。发送方确认在信道上设置channel.confirmSelect()开启confirm模式,所有发布的信息都会有一个唯一的ID用于消息的确认。一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正常路由。一般出现错误是MQ发生内部错误导致消息丢失。 confirm有三种确认方式: 第一种channel.waitForConfirms():普通发送方确认,阻塞直到MQ返回成功。 第二种channel.waitForConfirmsOrDie():批量确认,阻塞直到MQ返回所有消息发送成功。 第三种channel.addConfirmListener():异步监听,设置一个监听类实现handleNack()和handleAck()方法即可。

普通方式:

1
2
3
4
5
6
7
8
9
// 开启发送者确认模式
channel.confirmSelect();
channel.basicPublish(EXCHANGE_NAME, ROUTE_KEY, true, null, message.getBytes());
// 确认是否成功(true成功)
if (channel.waitForConfirms()){
System.out.println("send success");
}else{
System.out.println("send failure");
}

批量确认:

1
2
3
4
5
6
7
8
9
// 开启发送者确认模式
channel.confirmSelect();
for (int i=0; i<10; i++){
// 发送消息
String message = "Hello_" + ( i + 1 );
channel.basicPublish("myexchange", ROUTE_KEY, true, null, message.getBytes());
}
// 启用发送者确认模式(批量确认)
channel.waitForConfirmsOrDie();

异步监听:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 启用发送者确认模式
channel.confirmSelect();
// 添加发送者确认监听器
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) {
System.out.println("确认成功:" + deliveryTag + ",multiple:" + multiple);
}

@Override
public void handleNack(long deliveryTag, boolean multiple) {
System.out.println("确认失败:" + deliveryTag + ",multiple:" + multiple);
}
});

// 添加失败者通知
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) {
String message = new String(body);
System.out.println("RabbitMQ路由失败:" + routingKey + "." + message);
}
});

备用交换机

在配置交换机的时候可以在交换机上配置alternate-exchange备用交换机。这样在主交换机无法路由消息的时候,消息将被路由到这个新的备用交换机。 如果发布消息的时候同时设置了失败通知,当主交换机不能路由消息的时候,不会直接通知生产者失败,而是会先路由到备用交换机。

1
2
3
4
5
6
7
// 声明备用交换机
Map<String,Object> argsMap = new HashMap<String,Object>();
argsMap.put("alternate-exchange",BAK_EXCHANGE_NAME);
// 主交换机
channel.exchangeDeclare("main.exchange", BuiltinExchangeType.DIRECT, false, false, argsMap);
// 备用交换机
channel.exchangeDeclare("bak.exchange", BuiltinExchangeType.FANOUT, true, false, null);

接收消息

拉取模式(Get)

拉取模式很简单,就是在程序里写个while(true)不断向RabbitMQ发送请求,如果有消息则返回,如没有消息则返回一个空。这种方式每次请求都需要建立连接,发出请求,而且对于RabbitMQ来说它并不知道程序会何时发出请求,一般不会使用这种方式。

1
2
3
4
5
6
7
8
9
10
while (true){
// 拉一条,第二个参数指是否自动确认。如果是自动确认的,那么RabbitMQ认为这条消息消费,从队列中删除)
GetResponse getResponse = channel.basicGet(queueName, false);
if (getResponse != null){
System.out.println("received[" + getResponse.getEnvelope().getRoutingKey() + "]" + new String(getResponse.getBody()));
}
// 确认(自动、手动)
channel.basicAck(0, true);
Thread.sleep(1000);
}

推送模式(Consume)

推送模式顾名思义就是由RabbitMQ来负责推送数据。当一个消费者注册到RabbitMQ之后,RabbitMQ在有消息进入队列的时候,会自动推送消息给消费者。

消息应答

消费者在收到一条消息之后需要进行确认,消息确认之后,RabbitMQ才会从队列中删除这一条消息。 消息确认分为自动确认和手动确认。 在声明队列的时候,可以指定autoAck参数,当autoAck为true时,一旦消费者收到了消息,那么RabbitMQ就默认确认了消息。如果消费者在处理这条消息的时候出了错,那么久没有什么办法可以重新处理这条消息了。所以在更多的时候需要我们在消费者成功处理完成消息之后再告诉RabbitMQ确认,这就是手动确认。 当autoAck=false的时候,RabbitMQ会等消费者发回ack信号之后才会从队列中移除消息。这样消费者就不用担心处理消息过程中消费者进程挂掉后消息丢失的问题。 如果服务器一直没有收到消费者的ack信号,并且消费此消息的消费者与RabbitMQ断开连接,则服务器就摸着这条消息处理失败,重新将消息放到队列中,等待投递给下一个连接上的消费者。

单条确认:

1
2
3
4
5
6
7
8
9
10
11
12
// 声明一个消费者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received["+envelope.getRoutingKey() + "]" + message);
// 单条确认,这边第二个参数是多条的意思,但是每次处理一个之后都会执行这个代码,所以还是等于单条确认。
channel.basicAck(envelope.getDeliveryTag(), true);
}
};
// 消费者正式开始在指定队列上消费消息
channel.basicConsume(queueName, false, consumer);

批量确认:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 批量确认,还是继承上面的DefaultConsumer,但是需要变量用来计数处理了多少个数据。
*/
public class BatchAckConsumer extends DefaultConsumer {
// 计数,处理了多少条数据
private int dealtMessageCount = 0;
public BatchAckConsumer(Channel channel) {
super(channel);
}

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("批量Received[" + envelope.getRoutingKey() + "]" + message);
meesageCount++;
// 批量确认 50一批
if (meesageCount % 50 == 0){
this.getChannel().basicAck(envelope.getDeliveryTag(), true);
System.out.println("批量进行消息的确认------------");
}
if ("stop".equals(message)){
// 如果是最后一条消息,则把剩余的消息都进行确认
this.getChannel().basicAck(envelope.getDeliveryTag(), true);
System.out.println("批量进行最后业务消息的确认---------");
}
}
}

消息拒绝

RabbitMQ消息拒绝可以使用Reject和Nack。当处理消息出现问题,需要让别的消费者处理的时候就需要使用到消息拒绝。Reject和Nack在拒绝消息的时候可以使用requeue标识,标识该消息失败后是否需要重新投递。Reject一次只能拒绝一条,Nack一次可以拒绝多条。 一般来说不太会做消息的拒绝,因为通常情况下处理消息出现问题,即使重新投递到其他消费者还是会同样的出现问题,这就需要根据实际情况来判断。如果是数据问题导致消息处理肯定会失败的情况,就不需要拒绝消息,而是需要将这条记录记录下来做另外的处理,如果是因为网络故障等意外情况,那么可以尝试重新投递。 在重新投递的情况下,重新投递的消息依然使用轮询的方式进行投递(等于一条新的消息)。 假设队列中有10条消息,有3个消费者ABC,其中两个AB可以正常处理消息。那么消费者C在处理第三条消息的时候就会失败,然后第三条消息被重新投递到队列。这样队列中就是还剩下顺序为4,5,6,7,8,9,10,3的7条消息。接下来AB处理完4,5两条消息后消息6也是一样被重新投递,同理消息9也一样。所以在10次投递之后,消费者A处理了1,4,7,10四条消息,消费者B处理了2,5,8三条消息,队列中还有3,6,9三条消息。此时轮询到了消费者B,消费者B处理消息3。然后发现消息6又来到了无法处理消息的消费者C,再次重新投递。最后就会由消费者A处理消息9,消费者B再把消息6处理掉。最后就是消费者A依次处理了1,4,7,10,9,消费者B依次处理了2,5,8,3,6。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 声明一个消费者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try{
String message = new String(body, "UTF-8");
System.out.println("Received[" + envelope.getRoutingKey() + "]" + message);
throw new RuntimeException("抛出异常" + message);
}catch (Exception e){
e.printStackTrace();
// 这里有两种方式拒绝
// Reject方式拒绝(第2个参数决定是否重新投递)
// channel.basicReject(envelope.getDeliveryTag(),true);
// Nack方式的拒绝(第2个参数决定是否批量,第3个参数决定是否重新投递)
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
};
// 消费者正式开始在指定队列上消费消息
channel.basicConsume(queueName, false, consumer);

QOS预取

在消费者在获取消息的时候,可以预先要求接受一定数量的消息。在处理完成一定量的消息之后进行批量确认。这种机制可以用来对RabbitMQ给消费者推送消息的时候进行限流。 举个例子:假设现在有3条消息在队列中,一个消费者A监听了这个队列,处理完成消息之后这个消费者A并没有进行ack,所以这时候即使有个新的消费者B来监听这个队列也不会获取到这3条消息,只有当消费者A被关闭(与RabbitMQ连接关闭)之后,消费者B才能正常获取到这3条消息。 如果我们开启了Qos预取模式进行流控,即在信道上设置属性channel.basicQos(2, false),就可以解决上面的问题。第一个参数为预取数量,第二个参数为是否为对信道进行属性设置。在上面的章节中提到,RabbitMQ服务端和客户端使用信道进行通信,所以在一个信道中可以有多个消费者。假设设置了如下限制:

1
2
channel.basicQos(8, false);
channel.basicQos(10, true);
那么在这个信道上,整个通道最多能有10条未确认消息,每个消费者最多有8个未确认消息。 继续上面的例子,在我们设置了预取两条消息之后,消费者A只能获取两条消息不确认。这时候如果有消费者B来监听这个队列,可以正常的获取到第三条消息。

消费者事务

消费者事务使用的方法和生产者的事务一样。但是在具体使用中和消息的确认有一定的关系。 如果开启了自动确认(autoAck=true),那么就不支持消费者事务,因为在消费者接收到消息的时候RabbitMQ就已经认为你成功确认并把消息移除了,此时再回滚也于事无补。 如果没有开启自动确认(autoAck=false),可以先进行消息确认ack,然后RabbitMQ会等待事务返回结果,如果事务提交,那么成功确认并移除消息,如果事务回滚,会以事务为准,消息重新投放进队列。

死信交换机

在上面消息拒绝说到出现异常的情况下,不一定是要进行消息重新投递到队列的,因为消息处理失败极有可能在第二次投递的时候依然会处理失败(即可能是数据或业务问题导致)。但是如果使用消息拒绝,又同时设置了requeue=false导致了消息丢失,这也是不可取的。所以这里有一个解决方案就是死信交换机。顾名思义,死信交换机就是用来处理“死掉的信息”。 如何定义“死掉的信息”,一般有3种情况: 1.消息被拒绝,并且requeue参数为false。 2.消息过期(默认消息不过期,但是可以设置队列和消息的过期时间)。 3.队列达到最长长度。

死信交换机其实就是一个普通的交换机,就是它的消息来源是主交换机投递到队列后“死亡的消息”。创建死信交换机/队列的方式和正常创建交换机/队列一样。主要是在创建原有队列的时候有不同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 创建死信交换机
channel.exchangeDeclare("dlx.exchange", BuiltinExchangeType.TOPIC);
// 创建对应的队列
channel.queueDeclare("dlx.queue", false, false, false, null);
// 绑定,将队列和死信交换机通过路由键进行绑定
channel.queueBind("dlx.queue", "dlx.exchange", "#");
// 声明消费者1,省略

// 创建普通交换机
channel.exchangeDeclare("normal.exchange",BuiltinExchangeType.TOPIC);
// 队列绑定死信交换机
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange");
// 可以修改消息的路由键,如果不修改还是原来的
args.put("x-dead-letter-routing-key", "errormsg");
// 声明队列,并且在队列上绑定死信交换机
channel.queueDeclare("normal.queue", false, false, false, args);
channel.queueBind("normal.queue", "normal.exchange", "#");
// 声明消费者2,省略
这样在声明完成消费者之后,往普通交换机发送一条消息并让消费者2拒绝之后,消息就会被重新投递到死信交换机,然后由消费者1处理消息。

这边值得提的有几下几点: 1.在我们声明普通队列绑定死信交换机的时候,并不要求死信交换机已经被声明,但是当有消息需要路由要死信交换机的时候,该交换机必须存在,否则消息将会丢失。 2.一个消息“死亡”之后走死信交换机进行路由的时候,在消息的头上会增加一个x-death数组,数组中保存该消息每经过一次死信的信息。经过一次死信交换机会给数组中增加一个节点,节点中保存有关死信的key-value信息如下: queue:消息成为死信时所在队列的名称 reason:消息成为死信的原因 time:消息成为死信的时间,是一个64位的时间戳 exchange:消息被发送的交换机 routing-keys:消息被发送时使用的路由关键字 count:在该队列中消息由于这个原因被死信路由的次数 original-expiration(如果消息是因为消息的TTL成为死信时,有该值):消息的原始过期时间属性,这个值在消息被死信路由时将被移除,为了避免消息在其他队列中再次过期

队列的控制

临时队列

临时队列就是没有持久化的队列,如果RabbitMQ服务器重启,那么这些队列将不复存在。

自动删除队列

自动删除队列相比普通队列就是在最后一个消费者断开连接的时候,会把该队列删除。

1
2
// 设置第4个参数为true即可
channel.queueDeclare("autodelete.queue", false, false, true, args);

单消费者队列

普通队列可以有多个消费者同时消费,此时采用轮询机制(上面已经讲过)。如果需要消费者独占一个队列,那么就需要对队列进行单独的设置。

1
2
// 设置第3个参数为true即可
channel.queueDeclare("autodelete.queue", false, true, false, args);

自动过期队列

自动过期队列就是队列在超过一段时间没有使用之后会被RabbitMQ删除。 对于“没有使用”的定义:没有get操作且没有消费者连接。 注意:有消息并不代表被使用。

1
2
3
4
5
// 设置额外参数
Map<String, Object> args = new HashMap<>();
args.put("x-expires", 20*1000); // 20s
// 声明队列的时候加入参数
channel.queueDeclare("autoexpire.queue", false, false, false, args);

永久队列

对于一个队列,如果是持久化队列那么队列会被保存在磁盘中,当RabbitMQ服务重启之后,该队列会保存在MQ中,而非持久化队列在RabbitMQ服务重启之后队列就回消失。

1
2
// 设置第2个参数为true即可
channel.queueDeclare("durable.queue", true, false, false, args);

队列级别的消息过期设置

消息的过期时间可以给消息设置,也可以给队列设置。只要给队列设置x-message-ttl参数,就可以设定该队列所有消息的存活时间。

1
2
3
4
5
// 设置额外参数
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 30*1000); // 30s
// 声明队列的时候加入参数
channel.queueDeclare("autoexpire.queue", false, false, false, args);

可以设置的队列额外参数

参数名用途
x-dead-letter-exchange死信交换机
x-dead-letter-routing-key死信消息的可选路由键
x-expires队列在指定毫秒数后被删除
x-ha-policy创建 HA 队列
x-ha-nodesHA 队列的分布节点
x-max-length队列的最大消息数
x-message-ttl毫秒为单位的消息过期时间,队列级别
x-max-priority最大优先值为 255 的队列优先排序功能
x-max-length对队列中消息的条数进行限制
x-max-length-bytes对队列中消息的总量进行限制

消息的控制

RabbitMQ对一个消息的大小控制为2GB。发布的消息中包括了 Basic.Publish、消息头帧(Basic.Properties)、消息体 消息头帧中包括了以下属性: |属性名|用途| |----|----| |content-type|消息的MIME类型,如application/json| |content-encoding|指示消息体使用某种特殊的方式进行压缩或编码| |message-id|消息的唯一标识| |correlation-id|关联消息的id,常用于消息的响应| |timestamp|消息创建的时间戳| |expiration|消息的过期时间戳| |delivery-mode|消息的持久化类型,1:非持久化;2:持久化| |app-id|应用程序的版本号| |user-id|和app-id一起来帮助追踪出现问题的消息发布者应用程序| |type|消息类型名称,由应用间自行规定使用| |reply-to|用于消息的回复| |headers|键值对表,由应用间自行规定使用| |priority|指定队列中的消息的优先级|

如持久化消息:

1
2
3
4
// 发布持久化的消息(delivery-mode=2)
channel.basicPublish(EXCHANGE_NAME, routekey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
// 这里的PERSISTENT_TEXT_PLAIN是一个new BasicProperties(xx, ...);
// 其中会把deliveryMode设置为2

Request-Response模式

这边举个回复消息的例子,消息的回复其实就是消费者在处理消息之后,通过响应队列给生产者发送一条消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// 生产者部分(一个生产者+一个消费者)
// 创建连接等操作
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 创建交换机
channel.exchangeDeclare("myexchange", "direct", false);

// 响应队列QueueName,消费者将会把要返回的信息发送到该Queue,名字就不指定了。
String responseQueue = channel.queueDeclare().getQueue();
// 消息的唯一id
String msgId = UUID.randomUUID().toString();
//设置消息中的应答属性
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().replyTo(responseQueue)
.messageId(msgId)
.build();

// 声明了一个消费者,这个消费者用来接收响应队列里面的消息
final Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
String message = new String(body, "UTF-8");
System.out.println("接收到响应消息[" + envelope.getRoutingKey() + "]" + message);
}
};
// 消费者响应队列上的消息
channel.basicConsume(responseQueue, true, consumer);

String msg = "Hello";
// 发送消息时,把响应相关属性设置进去
channel.basicPublish("myexchange", "myroutingkey", properties, msg.getBytes());


// 消费者部分(一个消费者)
// 声明一个队列
String queueName = "normalqueue";
channel.queueDeclare(queueName, false, false, false, null);

// 绑定,将队列和交换机通过路由键进行绑定
String routekey = "myroutingkey";
channel.queueBind(queueName, "myexchange", routekey);
// 声明了一个消费者
final Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received[" + envelope.getRoutingKey() + "]" + message);
// 从消息中拿到相关属性(确定需要相应的队列,以及关联的原始消息)
AMQP.BasicProperties respProp = new AMQP.BasicProperties.Builder().replyTo(properties.getReplyTo())
.correlationId(properties.getMessageId())
.build();
// 消息消费时,同时需要生作为生产者生产消息(以OK为标识)
channel.basicPublish("", respProp.getReplyTo(), respProp, ("成功获取到消息:" + message).getBytes("UTF-8"));
}
};
// 消费者正式开始在指定队列上消费消息
channel.basicConsume(queueName,true,consumer);

至此,基本的RabbitMQ原生Java使用就都介绍到了,后面会介绍结合Spring的相关知识,希望对学习和使用的读者有所帮助。有问题请一定指出,欢迎交流!

本篇可能还会更新的内容:RabbitMQ高可用集群(HaProxy,镜像队列)