RabbitMQ使用指南1,2——简介和安装、原生Java客户端的使用
写在前面
之前自己学习了一下RMQ的使用,一直没有时间整理,现在抽空整理一下笔记。
主要内容
1.RabbitMQ简介和安装 这块主要是一个RabbitMQ所包含的内部元素的介绍(交换机、队列等等)。
2.原生Java客户端的使用 重要,主要理解是这一块。弄懂了原生的使用,基本上遇到问题也都能解决了。
2020-9-24 注:这边考虑了一下,模块1和模块2结合在一起比较好,所以总的就分两部分吧。
3.与Spring集成 其实就是把上一部分进行了封装,理解的话需要Spring相关知识,比如Bean的初始化,自动注入。使用比较无脑,cv就行。
RabbitMQ简介和安装
简介
正文开始,首先我们来了解一下什么MQ,为什么要用MQ。
什么是MQ?中文名消息中间件。一般来说是用于分布式系统中多个子系统之间高效的传递消息,主要关注的就是消息的接收与分发,用异步的方式完成各个子系统之间信息交流。基本上一般的MQ都能做到高效、可靠。
划重点:消息中间件不生产消息,只是消息的搬运工。
为什么要用MQ?之前所有业务都集成在一个系统里,渐渐地业务量提升、系统功能多样化,单个应用可能会导致更新困难、一旦某个功能挂了可能整个系统用不了,所以把一个系统拆分成多个子系统。那么这个时候就要解决子系统之间的通信问题。在信息量比较小的情况下,使用接口调用RPC是比较常见的方法,但是在信息量比较大的情况或者业务比较复杂涉及多个子系统的时候,就需要中间件来帮忙了。对于这种复杂的分布式系统,消息中间件可以降低子系统之间的耦合度(各个模块之间没有直接的调用,而是都通过MQ通信)、赋予子系统异步处理能力(预警系统要发短信,往MQ里扔一条消息就行了,可以接下去走其他逻辑,短信服务系统自己会从MQ中获取消息并发送短信)、赋予系统缓冲能力(突然要发几千几万条短信,调用方如果突然发几千几万个请求去短信服务让其发短信肯定会造成性能问题,可能直接就挂了。MQ就可以接受并把这些消息储存起来,就像一个蓄水池,做到“削峰平谷”)、赋予系统良好的扩展性(有新业务需要加入,比如本来订单系统需要给库存系统发送消息,现在又要发短信,只要多给MQ发一条给短信服务的消息,对原有的功能没有影响)、赋予系统良好的伸缩性(这个好理解,需要发短信的系统太多了,每分钟成千上万条,那当然就是加短信服务应用,处理不完就增加服务器好了)。
划重点:低耦合、异步通信、缓冲、伸缩性、扩展性。
应用场景
先略过吧。不是特别重要,后面的章节比较重要。
常用中间件比较
常用的4款MQ有:ActiveMQ、RabbitMQ、RocketMQ、Kafka。
ActiveMQ:性能最差,在业务量一定的情况下可以使用。 RabbitMQ:正统地结合AMQP协议,性能较好,比较成熟。但是底层是Erlang。 RocketMQ:阿里内部应用规模比较大,性能好,但是文档少,更新比较快(意思就是不太稳定、坑多,手动狗头)。要是Java技术强,公司有架构研发能力可以考虑。 Kafka:性能在百万级,大数据等领域可以使用。但是需要ZK支持。
这边的话。。ActiveMQ我就忽略了没看,RocketMQ大概了解了一下,Kafka也是没看,准备学了ZK再来看的,然后ZK的源码直接看晕了gg。所以只能讲讲RMQ了。
RabbitMQ安装
Linux安装
1 | su - root |
Windows安装
下载Erlang并安装。安装程序名字应该是otp_win64_xxx.exe
。 下载RabbitMQ-Server并安装。安装完之后在安装目录的rabbitmq_server-3.6.6\sbin
文件夹下会有对应的.bat
文件。
AMQP在RabbitMQ中的体现
客户端与RabbitMQ的通信
连接(Connections)
无论是生产者还是消费者,与RabbitMQ通信的时候都需要创建连接。只有创建完一条TCP连接,并且相互确认符合AMQP协议之后,才能在这个连接的基础上再进行其他操作,比如开始创建信道。 可以在RabbitMQ的管理界面中点击Connections查看所有建立的连接。
信道(Channels)
信道是在连接的基础上创建的一个个通信的“虚拟连接”。因为连接资源是宝贵的,不可能每个线程都创建一个连接,所以RabbitMQ使用了“多路复用”的技术,对于一些多线程执行多个任务的应用来说,在一个TCP连接中创建多个信道,每个信道对应一个线程。 可以在RabbitMQ的管理界面中点击Connections查看所有建立的连接(上图),图中每个连接里面有个Channels列就是创建的信道数。 可以在RabbitMQ的管理界面中点击Channels查看所有建立的信道。
RabbitMQ中的几个要素
生产者(Producer): 消息的创建者,发送消息到RabbitMQ。
消费者(Consumer): 连接到RabbitMQ,订阅某个队列,消费消息。
消息(Message):包含传输的数据信息和标签。标签用来确定消息发送给哪个队列。获取到消息的消费者只会得到传输的数据信息。
交换机(Exchanges):所有发送的消息都会确定一个它所到达的交换机,然后当消息被发送到RabbitMQ就会先到这个交换机。就像要从国内发一封信到国外,这封信是去浦东机场坐飞机去国外还是去虹桥机场坐飞机去国外,这里的机场就是交换机。你可以在信的封面上写好,从xxx机场飞往国外。
路由键(Routing Key)、队列(Queue)、绑定(Binding):队列通过路由键绑定到交换机上。生产者把消息发送到交换机之后,交换机会根据消息上标明的路由键将消息发送到对应的队列中,由订阅这个队列的消费者进行接收。 可以在管理页面的Exchanges标签和Queues标签中查看交换机、队列及其绑定情况、监控信息等。 可以看着下面的图理解一下
虚拟主机(VHost):虚拟主机可以理解为在一个整个mq的服务器中的一个内部的mq服务器,拥有自己的交换机、队列。其中最重要的功能就是可以根据虚拟主机来设置权限。 虚拟主机提供了将一个mq服务器划分成多个区域的功能,对于不同的客户端可以进行区分。在客户端连接mq服务器的时候必须指定虚拟主机,默认为:“/”,默认的用户和密码为guest。 默认情况下,guest用户是不能通过非本机的管理页面登陆,所以在使用的时候尽量分配一个新的用户给客户端使用。在新建用户的时候必须要指派至少一个虚拟主机,并且该用户只能访问指派的虚拟主机内的队列、交换机。
交换机类型
交换机一共有四种:direct,fanout,topic,headers,这里就讲主要使用的前三种交换机。
Direct交换机
Direct交换机是RabbitMQ默认的交换机,需要的是队列绑定的键和消息的路由键完全一致时,消息会被投递到对应队列。
Fanout交换机
不管队列绑定了什么路由键,只要有消息发送到Fanout交换机,所有绑定在该交换机上面的队列都会收到一份消息。
Topic交换机
Topic交换机是通过使用*
/#
通配符进行匹配处理。当我们把队列通过绑定键绑定到Topic交换机上的时候,可以通过.
来分隔键,*
/#
来代表通配符。*
匹配一个,#
匹配一到多个。下面举例说明。
情况1:首先我们有三个队列,队列A通过绑定键 *.*.toyota
绑定到交换机,队列B通过绑定键*.black.*
绑定到交换机,队列C通过绑定键 fast.*.*
绑定到交换机,对应就是可以理解为队列A关注所有丰田的汽车,队列B关注所有黑色的汽车,队列C关注所有跑得快的汽车。 使用路由键fast.black.toyota
将消息发送到交换机,那么3个队列都会接收到消息,如果使用fast.white.toyota
将消息发送到路由键,那么只有队列A/C接收到消息。 情况2:如果我们有两个队列,队列A通过绑定键 *.*.toyota
绑定到交换机,队列B通过绑定键*.black.*
和绑定键 fast.*.*
绑定到交换机。 使用路由键fast.black.toyota
将消息发送到交换机,虽然路由键匹配了队列B的两种绑定键,但是只会给B投递一次消息。 情况3:当一个队列以”#”作为绑定键时,它将接收所有消息,而不管路由键如何,类似于fanout型交换机。 情况4:队列A通过绑定键 *.toyota
绑定到交换机,队列B通过绑定键#.toyota
绑定到交换机。 使用路由键black.toyota
将消息发送到交换机,队列A/B都能收到消息。 使用路由键fast.black.toyota
将消息发送到交换机,只有队列B都能收到消息。
Headers交换机
头交换机类似于主题交换机,但是主题交换机是根据路由键来判断的,头交换机是根据消息属性来进行消息的分发。下面举例说明。
队列A:绑定交换机参数是:filetype=png,catagory=animal,x-match=all; 队列B:绑定交换机参数是:filetype=png,catagory=tree,x-match=any; 队列C:绑定交换机参数是:filetype=jpg,catagory=animal,x-match=all。
此时发送消息如下: 消息1发送交换机的头参数是:filetype=png,catagory=animal则消息传送到队列A; 消息2发送交换机的头参数是:filetype=png则消息传送到队列A和队列B; 消息3发送交换机的头参数是:filetype=jpg,catagory=tree则消息没有匹配队列,此消息会被丢弃。 值得注意的就是x-match这个特殊参数,他有两个取值: all:默认,表示header中所有的属性必须完全一致; any:只要有一个属性匹配成功就可以。
队列与交换机的多重绑定
队列在绑定到交换机上的时候是允许绑定多个路由键的,就想上面说的情况2一样,这就是多重绑定。就是说单个绑定的队列只能收到指定的路由键的消息,但是多重绑定的队列可以有多次匹配的机会,但是就算一个队列的多个绑定键都匹配了也就只会有一条消息。
速度与可靠性的权衡
RabbitMQ中对于交换机、队列、消息有很多可以配置的属性,每种机制对mq的性能都有一定的影响。一般来说,速度越快的可靠性越低,可靠性好的性能差,具体怎么使用就需要根据业务需求来,没有最好只有最合适。
发送消息
无保障
最基础的通过basicPublish发布消息,配置了正确的交换机和路由键,消息就会被接收并发送到合适的队列。但是一旦网络有问题,消息无法路由或者RabbitMQ自身出现错误,这种方式就会有风险。
失败通知
在生产者发送消息的时候设置mandatory标识,这样如果消息不可路由,会通知生产者失败。
1 | //失败通知 回调 |
事务
事务的实现是通过对信道(Channel)的设置。在发送消息之前先对信道进行设置channel.txSelect()开启事务。然后发送消息之后使用channel.txComment()提交事务。 使用channel.txRollback()回滚事务。 注:事务对中间件的性能影响相当大,尽量不要使用。
1 | // 开启事务 |
发送方确认
代替事务使用的就是发送方确认。发送方确认在信道上设置channel.confirmSelect()开启confirm模式,所有发布的信息都会有一个唯一的ID用于消息的确认。一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正常路由。一般出现错误是MQ发生内部错误导致消息丢失。 confirm有三种确认方式: 第一种channel.waitForConfirms():普通发送方确认,阻塞直到MQ返回成功。 第二种channel.waitForConfirmsOrDie():批量确认,阻塞直到MQ返回所有消息发送成功。 第三种channel.addConfirmListener():异步监听,设置一个监听类实现handleNack()和handleAck()方法即可。
普通方式:
1 | // 开启发送者确认模式 |
批量确认:
1 | // 开启发送者确认模式 |
异步监听:
1 | // 启用发送者确认模式 |
备用交换机
在配置交换机的时候可以在交换机上配置alternate-exchange备用交换机。这样在主交换机无法路由消息的时候,消息将被路由到这个新的备用交换机。 如果发布消息的时候同时设置了失败通知,当主交换机不能路由消息的时候,不会直接通知生产者失败,而是会先路由到备用交换机。
1 | // 声明备用交换机 |
接收消息
拉取模式(Get)
拉取模式很简单,就是在程序里写个while(true)不断向RabbitMQ发送请求,如果有消息则返回,如没有消息则返回一个空。这种方式每次请求都需要建立连接,发出请求,而且对于RabbitMQ来说它并不知道程序会何时发出请求,一般不会使用这种方式。
1 | while (true){ |
推送模式(Consume)
推送模式顾名思义就是由RabbitMQ来负责推送数据。当一个消费者注册到RabbitMQ之后,RabbitMQ在有消息进入队列的时候,会自动推送消息给消费者。
消息应答
消费者在收到一条消息之后需要进行确认,消息确认之后,RabbitMQ才会从队列中删除这一条消息。 消息确认分为自动确认和手动确认。 在声明队列的时候,可以指定autoAck参数,当autoAck为true时,一旦消费者收到了消息,那么RabbitMQ就默认确认了消息。如果消费者在处理这条消息的时候出了错,那么久没有什么办法可以重新处理这条消息了。所以在更多的时候需要我们在消费者成功处理完成消息之后再告诉RabbitMQ确认,这就是手动确认。 当autoAck=false的时候,RabbitMQ会等消费者发回ack信号之后才会从队列中移除消息。这样消费者就不用担心处理消息过程中消费者进程挂掉后消息丢失的问题。 如果服务器一直没有收到消费者的ack信号,并且消费此消息的消费者与RabbitMQ断开连接,则服务器就摸着这条消息处理失败,重新将消息放到队列中,等待投递给下一个连接上的消费者。
单条确认:
1 | // 声明一个消费者 |
批量确认:
1 | /** |
消息拒绝
RabbitMQ消息拒绝可以使用Reject和Nack。当处理消息出现问题,需要让别的消费者处理的时候就需要使用到消息拒绝。Reject和Nack在拒绝消息的时候可以使用requeue标识,标识该消息失败后是否需要重新投递。Reject一次只能拒绝一条,Nack一次可以拒绝多条。 一般来说不太会做消息的拒绝,因为通常情况下处理消息出现问题,即使重新投递到其他消费者还是会同样的出现问题,这就需要根据实际情况来判断。如果是数据问题导致消息处理肯定会失败的情况,就不需要拒绝消息,而是需要将这条记录记录下来做另外的处理,如果是因为网络故障等意外情况,那么可以尝试重新投递。 在重新投递的情况下,重新投递的消息依然使用轮询的方式进行投递(等于一条新的消息)。 假设队列中有10条消息,有3个消费者ABC,其中两个AB可以正常处理消息。那么消费者C在处理第三条消息的时候就会失败,然后第三条消息被重新投递到队列。这样队列中就是还剩下顺序为4,5,6,7,8,9,10,3的7条消息。接下来AB处理完4,5两条消息后消息6也是一样被重新投递,同理消息9也一样。所以在10次投递之后,消费者A处理了1,4,7,10四条消息,消费者B处理了2,5,8三条消息,队列中还有3,6,9三条消息。此时轮询到了消费者B,消费者B处理消息3。然后发现消息6又来到了无法处理消息的消费者C,再次重新投递。最后就会由消费者A处理消息9,消费者B再把消息6处理掉。最后就是消费者A依次处理了1,4,7,10,9,消费者B依次处理了2,5,8,3,6。
1 | // 声明一个消费者 |
QOS预取
在消费者在获取消息的时候,可以预先要求接受一定数量的消息。在处理完成一定量的消息之后进行批量确认。这种机制可以用来对RabbitMQ给消费者推送消息的时候进行限流。 举个例子:假设现在有3条消息在队列中,一个消费者A监听了这个队列,处理完成消息之后这个消费者A并没有进行ack,所以这时候即使有个新的消费者B来监听这个队列也不会获取到这3条消息,只有当消费者A被关闭(与RabbitMQ连接关闭)之后,消费者B才能正常获取到这3条消息。 如果我们开启了Qos预取模式进行流控,即在信道上设置属性channel.basicQos(2, false)
,就可以解决上面的问题。第一个参数为预取数量,第二个参数为是否为对信道进行属性设置。在上面的章节中提到,RabbitMQ服务端和客户端使用信道进行通信,所以在一个信道中可以有多个消费者。假设设置了如下限制:
1 | channel.basicQos(8, false); |
消费者事务
消费者事务使用的方法和生产者的事务一样。但是在具体使用中和消息的确认有一定的关系。 如果开启了自动确认(autoAck=true),那么就不支持消费者事务,因为在消费者接收到消息的时候RabbitMQ就已经认为你成功确认并把消息移除了,此时再回滚也于事无补。 如果没有开启自动确认(autoAck=false),可以先进行消息确认ack,然后RabbitMQ会等待事务返回结果,如果事务提交,那么成功确认并移除消息,如果事务回滚,会以事务为准,消息重新投放进队列。
死信交换机
在上面消息拒绝说到出现异常的情况下,不一定是要进行消息重新投递到队列的,因为消息处理失败极有可能在第二次投递的时候依然会处理失败(即可能是数据或业务问题导致)。但是如果使用消息拒绝,又同时设置了requeue=false
导致了消息丢失,这也是不可取的。所以这里有一个解决方案就是死信交换机。顾名思义,死信交换机就是用来处理“死掉的信息”。 如何定义“死掉的信息”,一般有3种情况: 1.消息被拒绝,并且requeue参数为false。 2.消息过期(默认消息不过期,但是可以设置队列和消息的过期时间)。 3.队列达到最长长度。
死信交换机其实就是一个普通的交换机,就是它的消息来源是主交换机投递到队列后“死亡的消息”。创建死信交换机/队列的方式和正常创建交换机/队列一样。主要是在创建原有队列的时候有不同。
1 | ConnectionFactory factory = new ConnectionFactory(); |
这边值得提的有几下几点: 1.在我们声明普通队列绑定死信交换机的时候,并不要求死信交换机已经被声明,但是当有消息需要路由要死信交换机的时候,该交换机必须存在,否则消息将会丢失。 2.一个消息“死亡”之后走死信交换机进行路由的时候,在消息的头上会增加一个x-death数组,数组中保存该消息每经过一次死信的信息。经过一次死信交换机会给数组中增加一个节点,节点中保存有关死信的key-value信息如下: queue:消息成为死信时所在队列的名称 reason:消息成为死信的原因 time:消息成为死信的时间,是一个64位的时间戳 exchange:消息被发送的交换机 routing-keys:消息被发送时使用的路由关键字 count:在该队列中消息由于这个原因被死信路由的次数 original-expiration(如果消息是因为消息的TTL成为死信时,有该值):消息的原始过期时间属性,这个值在消息被死信路由时将被移除,为了避免消息在其他队列中再次过期
队列的控制
临时队列
临时队列就是没有持久化的队列,如果RabbitMQ服务器重启,那么这些队列将不复存在。
自动删除队列
自动删除队列相比普通队列就是在最后一个消费者断开连接的时候,会把该队列删除。
1 | // 设置第4个参数为true即可 |
单消费者队列
普通队列可以有多个消费者同时消费,此时采用轮询机制(上面已经讲过)。如果需要消费者独占一个队列,那么就需要对队列进行单独的设置。
1 | // 设置第3个参数为true即可 |
自动过期队列
自动过期队列就是队列在超过一段时间没有使用之后会被RabbitMQ删除。 对于“没有使用”的定义:没有get操作且没有消费者连接。 注意:有消息并不代表被使用。
1 | // 设置额外参数 |
永久队列
对于一个队列,如果是持久化队列那么队列会被保存在磁盘中,当RabbitMQ服务重启之后,该队列会保存在MQ中,而非持久化队列在RabbitMQ服务重启之后队列就回消失。
1 | // 设置第2个参数为true即可 |
队列级别的消息过期设置
消息的过期时间可以给消息设置,也可以给队列设置。只要给队列设置x-message-ttl参数,就可以设定该队列所有消息的存活时间。
1 | // 设置额外参数 |
可以设置的队列额外参数
参数名 | 用途 |
---|---|
x-dead-letter-exchange | 死信交换机 |
x-dead-letter-routing-key | 死信消息的可选路由键 |
x-expires | 队列在指定毫秒数后被删除 |
x-ha-policy | 创建 HA 队列 |
x-ha-nodes | HA 队列的分布节点 |
x-max-length | 队列的最大消息数 |
x-message-ttl | 毫秒为单位的消息过期时间,队列级别 |
x-max-priority | 最大优先值为 255 的队列优先排序功能 |
x-max-length | 对队列中消息的条数进行限制 |
x-max-length-bytes | 对队列中消息的总量进行限制 |
消息的控制
RabbitMQ对一个消息的大小控制为2GB。发布的消息中包括了 Basic.Publish、消息头帧(Basic.Properties)、消息体 消息头帧中包括了以下属性: |属性名|用途| |----|----| |content-type|消息的MIME类型,如application/json| |content-encoding|指示消息体使用某种特殊的方式进行压缩或编码| |message-id|消息的唯一标识| |correlation-id|关联消息的id,常用于消息的响应| |timestamp|消息创建的时间戳| |expiration|消息的过期时间戳| |delivery-mode|消息的持久化类型,1:非持久化;2:持久化| |app-id|应用程序的版本号| |user-id|和app-id一起来帮助追踪出现问题的消息发布者应用程序| |type|消息类型名称,由应用间自行规定使用| |reply-to|用于消息的回复| |headers|键值对表,由应用间自行规定使用| |priority|指定队列中的消息的优先级|
如持久化消息:
1 | // 发布持久化的消息(delivery-mode=2) |
Request-Response模式
这边举个回复消息的例子,消息的回复其实就是消费者在处理消息之后,通过响应队列给生产者发送一条消息。
1 | // 生产者部分(一个生产者+一个消费者) |
至此,基本的RabbitMQ原生Java使用就都介绍到了,后面会介绍结合Spring的相关知识,希望对学习和使用的读者有所帮助。有问题请一定指出,欢迎交流!
本篇可能还会更新的内容:RabbitMQ高可用集群(HaProxy,镜像队列)