RabbitMQ使用指南3——RMQ与Spring结合


与Spring集成

这里用的Spring版本

1
2
3
4
5
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>4.3.11.RELEASE</version>
</dependency>
这里用的RabbitMQ结合Spring的版本
1
2
3
4
5
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>

这里把和springboot的结合放一放,先讲和spring的结合。毕竟使用xml配置和使用注解配置最终在代码上是一致的,即可以用xml配置的,一定可以用代码去实现。而且目前作者工作使用的项目还未使用springboot,所以就先讲讲xml配置了。

application-context.xml配置

命名空间

要使用xml配置RabbitMQ相关内容,首先要在spring的xml中引入RabbitMQ相关的Schema。

1
2
3
4
5
6
7
8
9
10
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
其中可以看到RabbitMQ相关的3个引用,在引用完成之后就可以在xml中配置RabbitMQ相关的内容了。

连接相关

使用Spring的配置文件配置连接,利用Spring对bean标签解析实例化创建RabbitMQ的连接工厂。

1
2
3
4
5
6
7
8
<!-- rabbitMQ配置 -->
<bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="127.0.0.1"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="8"/>
<property name="port" value="5672"/>
</bean>
property标签就是配置工厂类的变量,常用的几个列了一下。如果要手动从连接工厂获取连接可以使用createConnection()方法。

RabbitAdmin管理类

使用自定义标签配置管理器。

1
2
<!--Spring的rabbitmq admin 管理配置-->
<rabbit:admin connection-factory="rabbitConnectionFactory"/>
其中RabbitAdmin是RabbitMQ在Spring中的管理类,其中有创建队列、创建交换机、绑定等多种方法。RabbitAdmin实现了InitializingBean接口,在Spring容器初始化完成Bean之后会调用initializeBean(beanName, exposedObject, mbd);(详细见Spring源码中AbstractAutowireCapableBeanFactory的doCreateBean()方法),来调用到RabbitAdmin的afterPropertiesSet()方法。afterPropertiesSet()方法中又会调用本类中的initialize()方法。 其实贴一下initialize()方法源码就一目了然了,这边我把源码大概简化说明一下,源码贴在后面,大家对比一下很容易理解。

1
2
3
4
5
6
7
8
9
10
11
12
public void initialize() {
// 检查applicationcontext,applicationcontext是通过实现ApplicationContextAware接口的方法setApplicationContext获取的。

// 从Spring上下文中获取到所有的Exchange.class、Queue.class、Binding.class,也就是后面我们会配置的xml参数。

// processDeclarables方法从Spring上下文中获取Declarables类,这个类是个可以容纳Exchange,Queue,Binding的集合。这个类后面会提到。

// 第一个循环:对exchange的遍历,如果找到的交换机不是持久化或是自动删除的,会在日志中log.info记录。
// 第二个循环:对queue的遍历,如果找到的队列不是持久化或是自动删除的或是排他的(只能自己用),会在日志中log.info记录。

// 使用RabbitTemplate的channel,调用RabbitAdmin类中的declareExchanges、declareQueues、declareBindings对交换机、队列、绑定进行声明。
}

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* Declares all the exchanges, queues and bindings in the enclosing application context, if any. It should be safe
* (but unnecessary) to call this method more than once.
*/
@Override // NOSONAR complexity
public void initialize() {
if (this.applicationContext == null) {
this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
return;
}
this.logger.debug("Initializing declarations");
Collection<Exchange> contextExchanges = new LinkedList<Exchange>(this.applicationContext.getBeansOfType(Exchange.class).values());
Collection<Queue> contextQueues = new LinkedList<Queue>(this.applicationContext.getBeansOfType(Queue.class).values());
Collection<Binding> contextBindings = new LinkedList<Binding>(this.applicationContext.getBeansOfType(Binding.class).values());
Collection<DeclarableCustomizer> customizers = this.applicationContext.getBeansOfType(DeclarableCustomizer.class).values();

processDeclarables(contextExchanges, contextQueues, contextBindings);
final Collection<Exchange> exchanges = filterDeclarables(contextExchanges, customizers);
final Collection<Queue> queues = filterDeclarables(contextQueues, customizers);
final Collection<Binding> bindings = filterDeclarables(contextBindings, customizers);

for (Exchange exchange : exchanges) {
if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) {
this.logger.info("Auto-declaring a non-durable or auto-delete Exchange (" + exchange.getName() + ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". " + "It will be deleted by the broker if it shuts down, and can be redeclared by closing and " + "reopening the connection.");
}
}

for (Queue queue : queues) {
if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {
this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue (" + queue.getName() + ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:" + queue.isExclusive() + ". " + "It will be redeclared if the broker stops and is restarted while the connection factory is " + "alive, but all messages will be lost.");
}
}

if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {
this.logger.debug("Nothing to declare");
return;
}
this.rabbitTemplate.execute(channel -> {
declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
declareQueues(channel, queues.toArray(new Queue[queues.size()]));
declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
return null;
});
this.logger.debug("Declarations finished");
}

private void processDeclarables(Collection<Exchange> contextExchanges, Collection<Queue> contextQueues,
Collection<Binding> contextBindings) {
Collection<Declarables> declarables = this.applicationContext.getBeansOfType(Declarables.class, false, true)
.values();
declarables.forEach(d -> {
d.getDeclarables().forEach(declarable -> {
if (declarable instanceof Exchange) {
contextExchanges.add((Exchange) declarable);
}
else if (declarable instanceof Queue) {
contextQueues.add((Queue) declarable);
}
else if (declarable instanceof Binding) {
contextBindings.add((Binding) declarable);
}
});
});
}

队列

1
2
3
4
<!-- 定义队列 -->
<rabbit:queue name="queue1" durable="false"/>
<rabbit:queue name="queue2" durable="false"/>
<rabbit:queue name="queue3" durable="false"/>

在队列声明上,可以设置的常用属性: name:队列名称; durable:持久化(注意,持久化是需要交换机exchange、队列queue、消息message三者都是持久的); exclusive:仅创建者可以使用的私有队列,断开后自动删除; auto_delete:当所有消费客户端连接断开后,是否自动删除队列; id:允许独立于队列名称使用Spring进行管理。即可以使用ApplicationContext.getBean("id")来获取队列,并且能够使用属性占位符和SpEL表达式。

在标签queue里面,还可以设置更多详细的属性,具体属性可以看上一篇中的【队列的控制】。

1
2
3
4
5
6
7
8
9
10
<rabbit:queue name="queue4" durable="false" id="q4">
<rabbit:queue-arguments>
<!-- 设置队列内消息过期时间 -->
<entry key="x-message-ttl">
<value type="java.lang.Long">100</value>
</entry>
<!-- 创建 HA 队列 -->
<entry key="x-ha-policy" value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>
在SpringFramework版本3.2及以后,对于详细属性可以使用更简便的配置方式:
1
<entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>

交换机

交换机的配置使用xxxx-exchange标签进行配置,可以设置的常用属性: name:交换机名称; durable:持久化; auto-delete:自动删除; id:独立于队列名称的id。

对于Fanout交换机是比较简单的

1
2
3
4
5
6
7
8
9
<!-- 把队列与Fanout交换机绑定一起 -->
<rabbit:fanout-exchange name="fanout-exchange" xmlns="http://www.springframework.org/schema/rabbit" durable="false">
<rabbit:bindings>
<rabbit:binding queue="queue1"></rabbit:binding>
<rabbit:binding queue="queue2"></rabbit:binding>
<rabbit:binding queue="queue3"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- fanout交换机 end-->

对于Direct/Topic交换机来说,队列需要通过绑定键绑定在交换机上。

1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- topic交换器 begin-->
<!-- 定义队列 -->
<!-- 把需要数据的队列通过路由键与topic交换器绑定一起 -->
<rabbit:topic-exchange name="topic-exchange"
xmlns="http://www.springframework.org/schema/rabbit" durable="false">
<rabbit:bindings>
<binding pattern="#" queue="all_cars"></binding>
<binding pattern="*.black" queue="all_black_cars"></binding>
<binding pattern="fast.white" queue="fast_black_cars"></binding>
<binding pattern="fast.*" queue="fast_all_cars"></binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- topic交换器 end-->

相信仔细的读者在上面讲管理类的时候会看到一个Declarables类,这个类可以包含实现了Declarable的接口的类的实例,而Queue,Exchange,Binding都是实现了这个接口的类。 对于上面对Queue,Exchange,Binding的xml配置,其实都可以使用@Bean的方式实现。这里就写一个使用@Bean注解和Declarable创建绑定的例子:

1
key.queue=red:redqueue,white:whitequeue
1
2
3
4
5
6
7
@Bean
public Declarables bindings(@Value("${key.queue}") List<String> keysAndQueues) {
return new Declarables(keysAndQueues.stream()
.map(keysAndQueue -> BindingBuilder.bind(keyAndQueue.split(":")[0]).to("myExchange")
.with(keyAndQueue.split(":")[1]))
.collect(Collectors.toList()));
}

对于Headers交换器这里就写一个例子:

1
2
3
4
5
6
7
8
9
10
11
<rabbit:headers-exchange name="headers-exchange">
<rabbit:bindings>
<rabbit:binding queue="header_png_animal">
<rabbit:binding-arguments>
<entrykey="filetype"value="png"/>
<entrykey="catagory"value="animal"/>
<entrykey="x-match"value="all"/>
</rabbit:binding-arguments>
</rabbit:binding>
</rabbit:bindings>
</rabbit:headers-exchange>

从1.6版本开始,ExChanges可以设置internal标志。这样的交换机是不能从客户端接收消息的,只能用于死信交换机、与交换机绑定的交换机。 spring-amqp中的注释如下:

1
2
3
4
5
6
/**
* Is an exchange internal; i.e. can't be directly published to by a client, used for exchange-to-exchange binding only.
* @return true if internal.
* @since 1.6
*/
boolean isInternal();

注意点:RabbitMQ broker 不允许使用不匹配的参数来声明队列。默认情况下如果出现异常,RabbitAdmin会立即停止所有声明的处理过程,会导致后面监听容器的声明失败。我们可以通过设置RabbitAdmin的ignore-declaration-exceptions为true来让管理类在遇到错误的时候记录并记录执行下面的声明。 可以看到RabbitAdmin类中的logOrRethrowDeclarationException方法中首先会调用publishDeclarationExceptionEvent发布一个DeclarationExceptionEvent事件,紧接着判断ignoreDeclarationExceptions变量,如果是false直接就抛出异常了。

消费者

消费者是使用的Spring容器创建,可以使用xml或者@Component注解方式创建。

1
<bean id="myConsumer1" class="com.zm.service.fanout.MyConsumer1"></bean>
1
2
3
4
5
6
7
8
@Component
public class MyConsumer1 implements MessageListener {
private Logger logger = LoggerFactory.getLogger(MyConsumer1.class);
@Override
public void onMessage(Message message) {
logger.info("Get message: " + new String(message.getBody()));
}
}

创建监听Listener

1
2
3
4
5
6
7
<!--监听容器-->
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<!-- 配置文件,消费者监听对应的队列(fanout类型交换器绑定的队列) -->
<rabbit:listener ref="myConsumer1" queues="q1" method="onMessage"/>
<rabbit:listener ref="myConsumer2" queues="q2" method="onMessage"/>
<rabbit:listener ref="myConsumer3" queues="q3" method="onMessage"/>
</rabbit:listener-container>
通过这样简单的配置就可以将消费者绑定到对应的队列上。同时在listener结点上,还可以设置的部分属性有: priority:优先级,设置该listener的消费者的优先级。 method:接收到消息的时候调用的方法,如果不设置,那么消费者类应该去实现MessageListener接口。 response-exchange、response-routing-key:如果消息不设置reply-to,但是设置了这两个属性,那么对于上面调用method方法返回的值,将会被发送到response-exchange设置的交换机。 原文注释: response-exchange: The name of the default response Exchange to send response messages to. This will be applied in case of a request message that does not carry a "replyTo" property. Note: This only applies to a listener method with a return value, for which each result object will be converted into a response message. response-routing-key: The routing key to send along with a response message.

那么这些消费者是如何绑定到队列上的呢?在上一篇文章说过,所有的操作其实都是调用基本的操作方法实现的。回忆一下,我们在上一篇文章写到消费者绑定到队列上使用的方法是channel.basicConsume(queueName, false, consumer);,那么我们在spring-rabbit的最终目标就是要找到这一行代码,并且了解它是如何被调用的。

首先,由于使用了xml的配置方式,那么就从xml配置入手。从spring.handles文件找到解析xml所需的所有类,可以在RabbitNamespaceHandler类中看到listener-container对应的解析类为ListenerContainerParser,见下面代码:

1
registerBeanDefinitionParser("listener-container", new ListenerContainerParser());

进入ListenerContainerParser类,很明显这个类实现了BeanDefinitionParser接口,Spring会调用这个接口的parse方法。我们找到实现了的parse方法,对于group属性我们先不说,那么在parse方法中会调用到这个listener-container中所有的listener的解析,见下面代码:

1
2
3
4
5
6
7
8
9
10
11
private static final String LISTENER_ELEMENT = "listener";

public BeanDefinition parse(Element element, ParserContext parserContext) {
// 解析group属性
...
// 获取所有的listener子元素,执行parseListener方法。
List<Element> childElements = DomUtils.getChildElementsByTagName(element, LISTENER_ELEMENT);
for (int i = 0; i < childElements.size(); i++) {
parseListener(childElements.get(i), element, parserContext, containerList);
}
}

熟悉Spring的应该都知道,在做初始化解析xml的时候,就是去根据xml配置的参数去创建一个个BeanDefinition,即创建一个用来描述一个类的类,然后后面Spring再根据这个BeanDefinition的描述去创建类。所以在对于各个listener的解析来说,就是要创建对应的各个BeanDefinition。根据代码我可以找到最终创建的BeanDefinition变量名称是containerDef,而他初始化的方法是使用的工具类RabbitNamespaceUtils,见下面代码:

1
2
3
4
5
6
7
8
9
10
private void parseListener(Element listenerEle, Element containerEle, ParserContext parserContext, ManagedList<RuntimeBeanReference> containerList) {
// 创建messageListener并设置属性
RootBeanDefinition listenerDef = new RootBeanDefinition();
...
// 创建消费者容器并设置属性
BeanDefinition containerDef = RabbitNamespaceUtils.parseContainer(containerEle, parserContext);
...
// 向Spring注册BeanDefinition
parserContext.registerBeanComponent(new BeanComponentDefinition(containerDef, containerBeanName));
}

进入这个parseContainer方法,可以看到RootBeanDefinition的构造函数中传入的是ListenerContainerFactoryBean.class,后面是将xml配置的参数给BeanDefinition赋值。