1. 程式人生 > >RabbitMQ的Java應用(2) -- 使用Spring AMQP開發消費者應用

RabbitMQ的Java應用(2) -- 使用Spring AMQP開發消費者應用

前一篇中我們介紹了使用RabbitMQ Java Client訪問RabbitMQ的方法。但是使用這種方式訪問RabbitMQ,開發者在程式中需要自己管理Connection,Channel物件,Consumer物件的建立,銷燬,這樣會非常不方便。我們下面介紹使用Spring AMQP連線RabbitMQ,進行訊息的接收和傳送。

Spring AMQP是一個Spring子專案,它提供了訪問基於AMQP協議的訊息伺服器的解決方案。它包含兩部分,spring-ampq是基於AMQP協議的訊息傳送和接收的高層實現,spring-rabbit是基於RabbitMQ的具體實現。這兩部分我們下面都會使用到。

Spring-AMQP中的基礎類/介面

spring-amqp中定義了幾個基礎類/介面,Message,Exchange,Queue,Binding

Message

public class Message implements Serializable 
{
  private final MessageProperties messageProperties;
 
  private final byte[] body;
spring-amqp中的Message類類似於javax的Message類,封裝了訊息的Properties和訊息體。


Exchange

spring-amqp定義了Exchange介面

[java] view plain copy
<code class="language-java">public interface Exchange extends Declarable {  
        //Exchange名稱  
    String getName();  
        //Exchange的型別  
    String getType();  
        //Exchange是否持久化  
    boolean isDurable();  
        //Exchange不再被使用時(沒有任何繫結的情況下),是否由RabbitMQ自動刪除  
    boolean isAutoDelete();  
        //Exchange相關的引數  
    Map<String, Object> getArguments();</code>  


這個介面和RabbitMQ Client中的Exchange類相似。 spring-amqp中的Exchange繼承關係如下圖所示


AbstractExchange類是所有Exchange類的父類,實現Exchange介面的具體方法。 CustomExchange針對使用者自定義的Exchange物件。其他四個Exchange類,分別對應四種Exchange。 我們在Spring配置檔案中配置Exchange物件時,使用的就是這幾種Exchange類。

Queue

spring-amqp定義了Queue類,和RabbitMQ Client中的Queue相似,對應RabbitMQ中的訊息佇列。

public class Queue extends AbstractDeclarable {
 
    private final String name;
 
    private final boolean durable;
 
    private final boolean exclusive;
 
    private final boolean autoDelete;
 
    private final java.util.Map<java.lang.String, java.lang.Object> arguments;
 
        public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
        this(name, durable, exclusive, autoDelete, null);
    } 

Binding

Binding類是對RabbitMQ中Exchange-Exchange以及Exchange-Queue繫結關係的抽象。

public class Binding extends AbstractDeclarable 
{
 
    public enum DestinationType {
        QUEUE, EXCHANGE;
    }
 
    private final String destination;
 
    private final String exchange;
 
    private final String routingKey;
 
    private final Map<String, Object> arguments;
 
    private final DestinationType destinationType;
 
    public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,
            Map<String, Object> arguments) {
        this.destination = destination;
        this.destinationType = destinationType;
        this.exchange = exchange;
        this.routingKey = routingKey;
        this.arguments = arguments;
    }

對照RabbitMQ Java Client中Channel介面的queueBind和ExchangeBind方法

Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) 
 
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)

我們可以看出Binding類實際是對底層建立的Exchange-Queue和Exchange-Exchange繫結關係的高層抽象記錄類,它使用列舉型別DestinationType區分Exchange-Queue和Exchange-Exchange兩種繫結。

Spring AMQP搭建消費者應用

消費者應用程式框架搭建

我們接下來使用spring-amqp搭建一個RabbitMQ的消費者Web應用,我們先建立一個maven webapp應用程式,再新增一個dependency。

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.6.5.RELEASE</version>
 </dependency> 

spring-rabbit庫的引入是為了使用它裡面的RabbitAdmin類,建立Exchange,Queue和Binding物件,在匯入這個庫的時候同時引入了 spring-ampq和rabbitmq-client的庫,不需要另行匯入。

在src/main/resources目錄下建立application.properties檔案,用於記錄RabbitMQ的配置資訊。

mq.ip=localhost
mq.port=5672
mq.userName=rabbitmq_consumer
mq.password=123456
mq.virutalHost=test_vhosts
在src/main/resource目錄下建立applicationContext.xml檔案:
<?xml version="1.0" encoding="UTF-8"?>
 
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="
         http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
         http://www.springframework.org/schema/util
         http://www.springframework.org/schema/util/spring-util-4.0.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context-4.0.xsd" >
 
    <context:annotation-config/>
 
    <context:property-placeholder
            ignore-unresolvable="true" location="classpath*:/application.properties" />
 
    <!--從RabbitMQ Java Client建立RabbitMQ連線工廠物件-->
    <bean id="rabbitMQConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
        <property name="username" value="${mq.userName}" />
        <property name="password" value="${mq.password}" />
        <property name="host" value="${mq.ip}" />
        <property name="port" value="${mq.port}" />
        <property name="virtualHost" value="${mq.virutalHost}" />
    </bean>
 
    <!--基於RabbitMQ連線工廠物件構建spring-rabbit的連線工廠物件Wrapper-->
    <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
         <constructor-arg name="rabbitConnectionFactory" ref="rabbitMQConnectionFactory" />
    </bean>
 
    <!--構建RabbitAmdin物件,它負責建立Queue/Exchange/Bind物件-->
    <bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
        <constructor-arg name="connectionFactory" ref="connectionFactory" />
        <property name="autoStartup" value="true"></property>
    </bean>
 
    <!--構建Rabbit Template物件,用於傳送RabbitMQ訊息,本程式使用它傳送返回訊息-->
    <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
        <constructor-arg name="connectionFactory" ref="connectionFactory" />
    </bean>
 
    <!--RabbitMQ訊息轉化器,用於將RabbitMQ訊息轉換為AMQP訊息,我們這裡使用基本的Message Converter -->
    <bean id="serializerMessageConverter"
          class="org.springframework.amqp.support.converter.SimpleMessageConverter" />
 
    <!--Message Properties轉換器,用於在spring-amqp Message物件中的Message Properties和RabbitMQ的
     Message Properties物件之間互相轉換 -->      
    <bean id="messagePropertiesConverter"
          class="org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter" />      
 
    <!--定義AMQP Queue-->
    <bean id="springMessageQueue" class="org.springframework.amqp.core.Queue">
        <constructor-arg name="name" value="springMessageQueue" />
        <constructor-arg name="autoDelete" value="false" />
        <constructor-arg name="durable" value="true" />
        <constructor-arg name="exclusive" value="false" />
        <!--定義AMQP Queue建立所需的RabbitAdmin物件-->
        <property name="adminsThatShouldDeclare" ref="rabbitAdmin" />
        <!--判斷是否需要在連線RabbitMQ後建立Queue-->
        <property name="shouldDeclare" value="true" />
    </bean>
 
    <!--定義AMQP Exchange-->
    <bean id="springMessageExchange" class="org.springframework.amqp.core.DirectExchange">
        <constructor-arg name="name" value="springMessageExchange" />
        <constructor-arg name="durable" value="true" />
        <constructor-arg name="autoDelete" value="false" />
        <!--定義AMQP Queue建立所需的RabbitAdmin物件-->
        <property name="adminsThatShouldDeclare" ref="rabbitAdmin" />
        <!--判斷是否需要在連線RabbitMQ後建立Exchange-->
        <property name="shouldDeclare" value="true" />
    </bean>
 
    <util:map id="emptyMap" map-class="java.util.HashMap" />
 
    <!--建立Exchange和Queue之間的Bind-->
    <bean id="springMessageBind" class="org.springframework.amqp.core.Binding">
        <constructor-arg name="destination" value="springMessageQueue" />
        <constructor-arg name="destinationType" value="QUEUE" />
        <constructor-arg name="exchange" value="springMessageExchange" />
        <constructor-arg name="routingKey" value="springMessage" />
        <constructor-arg name="arguments" ref="emptyMap" />
    </bean>
 
    <!--偵聽springMessageQueue佇列訊息的Message Listener-->
    <bean id="consumerListener" 
        class="com.qf.rabbitmq.listener.RabbitMQConsumer" />
 
    <!--建立偵聽springMessageQueue佇列的Message Listener Container-->
    <bean id="messageListenerContainer"
          class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
        <property name="messageConverter" ref="serializerMessageConverter" />
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="messageListener" ref="consumerListener" />
        <property name="queues" ref="springMessageQueue" />
        <!--設定訊息確認方式為自動確認-->
        <property name="acknowledgeMode" value="AUTO" />
    </bean>
</beans>
我們定義了偵聽訊息佇列的Message Listener類RabbitMQConsumer
public class RabbitMQConsumer implements MessageListener
{
    @Autowired
    private MessagePropertiesConverter messagePropertiesConverter;
 
    @Override
    public void onMessage(Message message)
    {
        try 
        {
             //spring-amqp Message物件中的Message Properties屬性
             MessageProperties messageProperties = message.getMessageProperties();             
             //使用Message Converter將spring-amqp Message物件中的Message Properties屬性
             //轉換為RabbitMQ 的Message Properties物件
             AMQP.BasicProperties rabbitMQProperties =
                 messagePropertiesConverter.fromMessageProperties(messageProperties, "UTF-8");             
             System.out.println("The message's correlationId is:" + rabbitMQProperties.getCorrelationId());
             String messageContent = null;
             messageContent = new String(message.getBody(),"UTF-8");
             System.out.println("The message content is:" + messageContent);
        } 
        catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
}
上面的Listener類是實現了MessageListener介面的類,當容器接收到訊息後,會自動觸發onMessage方法。 如果我們想使用普通的POJO類作為Message Listener,需要引入org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter類
public class MessageListenerAdapter extends AbstractAdaptableMessageListener {
 
  public MessageListenerAdapter(Object delegate) {
        doSetDelegate(delegate);
    }
}
這裡的delegate物件就是我們的POJO物件。 假設我們定義一個Delegate類ConsumerDelegate
public class ConsumerDelegate
{
    public void processMessage(Object message)
    {
       //這裡接收的訊息物件僅是訊息體,不包含MessageProperties
       //如果想獲取帶MessageProperties的訊息物件,需要在Adpater中
       //定義MessageConverter屬性。
       String messageContent = message.toString();
       System.out.println(messageContent);
    }
}
在applicationContext.xml中定義Adapter物件,引用我們的Delegate物件。
 <bean id="consumerDelegate"
          class="com.qf.rabbitmq.listener.ConsumerDelegate" />
 
 <bean id="consumerListenerAdapter"
          class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
        <property name="delegate" ref="consumerDelegate" />
        <!--指定delegate處理訊息的預設方法 -->
        <property name="defaultListenerMethod" value="processMessage" />
 </bean>
最後將Message Listener Container中的Message Listener指向Adapter物件。
<bean id="messageListenerContainer"
          class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
        <property name="messageConverter" ref="serializerMessageConverter" />
        <property name="connectionFactory" ref="connectionFactory" />
        <!--設定Message Listener為Adapter物件 -->
        <property name="messageListener" ref="consumerListenerAdapter"/>
        <property name="queues" ref="springMessageQueue" />
        <property name="acknowledgeMode" value="AUTO" />
 </bean>
啟動Web應用後,我們從啟動日誌資訊可以看出應用連線上了RabbitMQ伺服器


從RabbitMQ的管理介面(用rabbitmq_consumer使用者登入)可以看到springMessageExchange和springMessageQueue已經建立,繫結關係也已經建立。


Consumer Tag自定義

連線springMessageQueue的消費者Tag是RabbitMQ隨機生成的Tag名

如果我們想設定消費者Tag為指定Tag,我們可以在Message Listener Container中 設定自定義consumer tag strategy。首先我們需要定義一個Consumer Tag Strategy類,它實現了ConsumerTagStrategy介面。

public class CustomConsumerTagStrategy implements ConsumerTagStrategy
{
    @Override
    public String createConsumerTag(String queue) {
        String consumerName = "Consumer1";
        return consumerName + "_" + queue;
    }
}
在applicationContext.xml中設定自定義ConsumerTagStrategy
<bean id="consumerTagStrategy" class="com.qf.rabbitmq.strategy.CustomConsumerTagStrategy" />
 <!--建立偵聽springMessageQueue佇列的Message Listener Container-->
 <bean id="messageListenerContainer"
          class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
     <property name="messageConverter" ref="serializerMessageConverter" />
     <property name="connectionFactory" ref="connectionFactory" />
     <property name="messageListener" ref="consumerListener" />
     <property name="queues" ref="springMessageQueue" />
     <property name="acknowledgeMode" value="AUTO" />
     <property name="consumerTagStrategy" ref="consumerTagStrategy" />
  </bean>

再次啟動Web應用,檢視RabbitMQ管理介面,我們可以看到Consumer Tag已經變成“Consumer1_springMessageQueue”,正如我們在CustomConsumerTagStrategy中設定的那樣。

消費者應用接收訊息驗證

我們編寫了一個生產者程式,向springMessageExchange傳送訊息。 生產者的主要程式碼如下,由於Exchange,Queue,Bind已經由消費者Web應用建立,因此生產者程式不再建立。

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("rabbitmq_producer");
factory.setPassword("123456");
factory.setVirtualHost("test_vhosts");
 
//建立與RabbitMQ伺服器的TCP連線
connection  = factory.newConnection();
channel = connection.createChannel();
 
String message = "First Web RabbitMQ Message";
 
String correlationId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties
                    .Builder()
                    .correlationId(correlationId)
                    .build();
 
channel.basicPublish("springMessageExchange","springMessage", props, message.getBytes());

啟動消費者Web應用,從控制檯輸出資訊可以看到消費者接收到了生產者傳送的訊息。

設定訊息手動確認模式

到目前為止,消費者端的Web應用對訊息的確認是自動確認模式,如果我們想改為手動確認方式,需要做以下兩點改動:

1)修改applicationContext.xml檔案中Message Listener Container的acknowledgeMode屬性的值為MANUAL。


<bean id="messageListenerContainer"
          class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    ......
    <property name="acknowledgeMode" value="MANUAL" /> 
</bean>
2)將自定義的Message Listener類從實現org.springframework.amqp.core.MessageListener介面,改為實現 org.springframework.amqp.rabbit.core.ChannelAwareMessageListener介面,實現它的 onMessage(Message,Channel)方法。


public class RabbitMQConsumer implements ChannelAwareMessageListener
{
    ...........
 
    @Override
    public void onMessage(Message message, Channel channel) 
    {
        try 
        {
             //spring-amqp Message物件中的Message Properties屬性
             MessageProperties messageProperties = message.getMessageProperties();             
             //使用Message Converter將spring-amqp Message物件中的Message Properties屬性
             //轉換為RabbitMQ 的Message Properties物件
             AMQP.BasicProperties rabbitMQProperties =
                     messagePropertiesConverter.fromMessageProperties(messageProperties, "UTF-8");             
             System.out.println("The message's correlationId is:" + rabbitMQProperties.getCorrelationId());
             String messageContent = null;
             messageContent = new String(message.getBody(),"UTF-8");
             System.out.println("The message content is:" + messageContent);
             channel.basicAck(messageProperties.getDeliveryTag(), false);
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }
}

onMessage方法的最後一句程式碼呼叫Channel.basicAck方法對訊息進行手動確認。再次執行生產者和消費者程式後,我們登入管理介面,從管理介面中可以看到springMessageQueue佇列中未確認訊息條數 (圖中Unacked列)為0條,說明消費者接收訊息後已經手動確認。

RPC模式設定

如果生產者和消費者Web應用之間使用RPC模式,即消費者接收訊息後要向指定Exchange/Queue傳送返回訊息,我們需要修改生產者和消費者的程式。 消費者程式修改點如下:

1)在applicationContext.xml中定義返回訊息對應的Exchange,Queue和Bind。

<!--定義AMQP Reply Queue-->
<bean id="springReplyMessageQueue" class="org.springframework.amqp.core.Queue">
        <constructor-arg name="name" value="springReplyMessageQueue" />
        <constructor-arg name="autoDelete" value="false" />
        <constructor-arg name="durable" value="true" />
        <constructor-arg name="exclusive" value="false" />
        <property name="adminsThatShouldDeclare" ref="rabbitAdmin" />
        <property name="shouldDeclare" value="true" />
    </bean>
 
    <!--定義AMQP Reply Exchange-->
    <bean id="springReplyMessageExchange" class="org.springframework.amqp.core.DirectExchange">
        <constructor-arg name="name" value="springReplyMessageExchange" />
        <constructor-arg name="durable" value="true" />
        <constructor-arg name="autoDelete" value="false" />
        <!--定義AMQP Queue建立所需的RabbitAdmin物件-->
        <property name="adminsThatShouldDeclare" ref="rabbitAdmin" />
        <property name="shouldDeclare" value="true" />
    </bean>
 
    <!--建立Reply Exchange和Reply Queue之間的Bind-->
    <bean id="springReplyMessageBind" class="org.springframework.amqp.core.Binding">
        <constructor-arg name="destination" value="springReplyMessageQueue" />
        <constructor-arg name="destinationType" value="QUEUE" />
        <constructor-arg name="exchange" value="springReplyMessageExchange" />
        <constructor-arg name="routingKey" value="springReplyMessage" />
        <constructor-arg name="arguments" ref="emptyMap" />
</bean>
2)修改自定義Message Listener類的onMessage方法,添加發送返回訊息的程式碼
public void onMessage(Message message, Channel channel) {
try 
  {
    ......................
    String replyMessageContent = "Consumer1 have received the message '" + messageContent + "'";
    channel.basicPublish(rabbitMQProperties.getReplyTo(), "springReplyMessage",
    rabbitMQProperties, replyMessageContent.getBytes());
    ......................

這裡傳送返回訊息直接使用接收訊息時建立的Channel通道,不過如果我們的Message Listener類是繼承自MessageListener介面,無法獲得Channel物件時,我們需要使用RabbitTemplate物件進行返回訊息的傳送(我們前面已經在applicationContext.xml中定義了這個物件)
public class RabbitMQConsumer implements MessageListener

   @Autowired
   private MessagePropertiesConverter messagePropertiesConverter;
 
   @Autowired
   private RabbitTemplate rabbitTemplate;
 
   @Override
   public void onMessage(Message message) 
   {
    ..........
    //建立返回訊息的RabbitMQ Message Properties
    AMQP.BasicProperties replyRabbitMQProps =
             new AMQP.BasicProperties("text/plain",
                           "UTF-8",
                            null,
                            2,
                            0, rabbitMQProperties.getCorrelationId(), null, null,
                            null, null, null, null,
                            null, null);
    //建立返回訊息的信封頭
    Envelope replyEnvelope =
             new Envelope(messageProperties.getDeliveryTag(), true, 
                           "springReplyMessageExchange", "springReplyMessage");
 
    //建立返回訊息的spring-amqp Message Properties屬性
    MessageProperties replyMessageProperties =
             messagePropertiesConverter.toMessageProperties(replyRabbitMQProps, 
                           replyEnvelope,"UTF-8");
 
    //構建返回訊息(spring-amqp訊息)
    Message replyMessage = MessageBuilder.withBody(replyMessageContent.getBytes())
                                         .andProperties(replyMessageProperties)
                                         .build();
 
    rabbitTemplate.send("springReplyMessageExchange","springReplyMessage", replyMessage); 
生產者程式新增對返回訊息佇列偵聽的Consumer
String correlationId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties
                    .Builder()
                    .correlationId(correlationId)
                    .replyTo("springReplyMessageExchange")
                    .build();
 
channel.basicPublish("springMessageExchange","springMessage", props, message.getBytes());
 
QueueingConsumer replyCustomer = new QueueingConsumer(channel);
channel.basicConsume("springReplyMessageQueue",true,"Producer Reply Consumer", replyCustomer);
 
String responseMessage = null;
 
while(true)
{
   QueueingConsumer.Delivery delivery = replyCustomer.nextDelivery();
   String messageCorrelationId = delivery.getProperties().getCorrelationId();
   if (messageCorrelationId != null && messageCorrelationId.equals(correlationId)) 
   {
       responseMessage = new String(delivery.getBody());
       System.out.println("The reply message's correlation id is:" + messageCorrelationId);
       break;
   }
}
if(responseMessage != null)
{
  System.out.println("The repsonse message is:'" + responseMessage + "'");
}
啟動修改後的生產者和消費者程式,我們從生產者的控制檯介面可以看到它接收到了消費者傳送的返回訊息。
消費者控制檯

生產者控制檯

消費者併發數設定

到目前為止,消費者Web應用消費訊息時,只有一個消費者接收並消費springMessageQueue佇列的訊息(如下圖所示)

如果傳送的訊息量比較大時,我們需要增加消費者的數目。

增加消費者數目要修改Message Listener Container的concurrentConsumers和maxConcurrentConsumers屬性,concurrentConsumers屬性是Message Listener Container建立時建立的消費者數目,maxConcurrentConsumers屬性是容器最大的消費者數目,我們下面把這兩個屬性都設定為5,使Message Listener Container中有5個消費者,同時修改CustomerConsumerTagStrategy類,在Tag中加入執行緒名,以區分不同的消費者。

<bean id="messageListenerContainer"
          class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
        ............
        <property name="consumerTagStrategy" ref="consumerTagStrategy" />
        <property name="concurrentConsumers" value="5" />
        <property name="maxConcurrentConsumers" value="5" />
</bean>

public class CustomConsumerTagStrategy implements ConsumerTagStrategy
{
    @Override
    public String createConsumerTag(String queue) {
        String consumerName = "Consumer_" + Thread.currentThread().getName();
        return consumerName + "_" + queue;
    }
}
啟動消費者Web應用,從管理頁面可以看到連線springMessageQueue佇列的有5個消費者。

修改生產者程式,迴圈傳送50條訊息


ReplyConsumer replyCustomer = new ReplyConsumer(channel);
channel.basicConsume("springReplyMessageQueue",true,"Producer Reply Consumer", replyCustomer);
 
for(int i=0; i<50; i++)
{
   String correlationId = UUID.randomUUID().toString();
   String message = "Web RabbitMQ Message " + i;
 
   AMQP.BasicProperties props = 
                   new AMQP.BasicProperties
                        .Builder()
                        .contentType("text/plain")
                        .deliveryMode(2)
                        .correlationId(correlationId)
                        .replyTo("springReplyMessageExchange")
                        .build();
 
   channel.basicPublish("springMessageExchange","springMessage", props, message.getBytes());
}
在修改的生產者程式碼中,我們將Consumer程式碼抽出,定義了ReplyCustomer類

public class ReplyConsumer extends DefaultConsumer
{
    public ReplyConsumer(Channel channel)
    {
        super(channel);
    }
 
    @Override
    public void handleDelivery(String consumerTag,
                                         Envelope envelope,
                                         AMQP.BasicProperties properties,
                                         byte[] body)
            throws IOException
    {
        String consumerName = properties.getAppId();
        String replyMessageContent = new String(body, "UTF-8");
        System.out.println("The reply message's sender is:" + consumerName);
        System.out.println("The reply message is '" + replyMessageContent + "'");
    }
}
修改消費者的Message Listener訊息,將Consumer Tag作為引數,放在返回訊息的Properties中,返回給生產者。
public void onMessage(Message message, Channel channel)
{
 try 
 {
   String consumerTag = messageProperties.getConsumerTag();
   String replyMessageContent = consumerTag + " have received the message '" + messageContent + "'";
 
   AMQP.BasicProperties replyRabbitMQProps =
                    new AMQP.BasicProperties("text/plain",
                            "UTF-8",
                            null,
                            2,
                            0, rabbitMQProperties.getCorrelationId(), null, null,
                            null, null, null, null,
                            consumerTag, null); 
   .............    
修改消費者的CustomConsumerTagStrategy類,用“Consumer” + “_” + 執行緒名作為Consumer Tag。

public class CustomConsumerTagStrategy implements ConsumerTagStrategy
{
    @Override
    public String createConsumerTag(String queue) {
        String consumerName = "Consumer_" + Thread.currentThread().getName();
        return consumerName;
    }
}
修改完成後,啟動生產者和消費者程式,通過檢視生產者的控制檯輸出,我們可以看到多個消費者接收了生產者傳送的訊息,傳送了返回訊息給生產者。

消費者訊息預取數設定

上述的消費者Web應用中,每個消費者每次從佇列中獲取1條訊息,如果我們想讓每個消費者一次性從訊息佇列獲取多條訊息,需要修改Message Listener Container的prefetchCount屬性,這樣可以提高RabbitMQ的訊息處理吞吐量。

<bean id="messageListenerContainer"
          class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
          <property name="prefetchCount" value="5" />
</bean>
這個屬性值最終被設定為底層Rabbit Client的Channel介面的basicQos方法引數

/**
     * Request a specific prefetchCount "quality of service" settings
     * for this channel.
     *
     * @see #basicQos(int, int, boolean)
     * @param prefetchCount maximum number of messages that the server
     * will deliver, 0 if unlimited
     * @throws java.io.IOException if an error is encountered
*/
void basicQos(int prefetchCount) throws IOException

這個方法設定從Channel上一次性可以讀取多少條訊息,我們在Container設定的PrefetchCount值為5,表示從一個消費者Channel上,一次性可以與預讀取5條訊息,按我們上面設定的5個消費者,5個消費者Channel計算,一次性可以預讀取25條訊息。為了證實這一點,我們修改消費者的程式碼,延長它處理一條訊息的時間。

需要說明的是,對於每個消費者而言,只有一條預取的訊息被接收且確認後,消費者才會再從訊息佇列中讀取訊息,並不是消費者在訊息沒有確認完成前,每次都從佇列裡預讀取prefetchCount條訊息。

public void onMessage(Message message, Channel channel) {
try 
    {
     ...........
     String messageContent = null;
     messageContent = new String(message.getBody(),"UTF-8");
     String consumerTag = messageProperties.getConsumerTag();
     String replyMessageContent = consumerTag + " have received the message '" + messageContent + "'";
 
     Thread.sleep(60000);
 
     ...........
     rabbitTemplate.send("springReplyMessageExchange","springReplyMessage", replyMessage);
     channel.basicAck(messageProperties.getDeliveryTag(), false); 

我們在onMessage方法中新增Thread.sleep(60000),使得處理一條訊息時間時間大於1分鐘,便於檢視訊息預取的效果,而且使用手動確認方式。

生產者程式改為一次性發送200條訊息。

啟動生產者程式,傳送200條訊息,我們可以看到springMessageQueue佇列裡有200條處於Ready狀態的訊息

啟動消費者程式,我們可以看到springMessageQueue佇列裡有25條訊息被預取了,Ready狀態的訊息從200條變成了175條,而未確認狀態的訊息數(Unacked列)變成了25條,即25條被預取,但是沒有被確認的訊息。

過了一段時間,等5個消費者確認了5條訊息後,又從訊息佇列預讀取了5條訊息,Ready狀態的訊息數變成170條,這時的訊息佇列的訊息數如下圖所示:


未確認的訊息數仍然是25條,但是總的訊息數變成了195條,表示已經有5條訊息被處理且確認了。

隨著訊息逐漸被處理,確認,消費者會逐漸從訊息佇列預取新的訊息,直到所有的訊息都被處理和確認完成。

rabbit標籤使用

上面的消費者Web應用使用了Spring傳統的beans元素定義,spring-rabbit提供了rabbit namespace,我們可以在applicationContext.xml中使用rabbit:xxx形式的元素標籤,簡化我們的xml配置。 我們首先在applicationContext.xml的namespace定義中新增rabbit namespace定義:
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:util="http://www.springframework.org/schema/util"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
         http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
         http://www.springframework.org/schema/util
         http://www.springframework.org/schema/util/spring-util-4.0.xsd
         http://www.springframework.org/schema/rabbit
         http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context-4.0.xsd" >
RabbitMQ Client ConnectionFactory的bean定義不需要修改,我們修改CachingConnectionFactory bean物件的定義
[html] view plain copy
<code class="language-html"><span style="font-size:10px;"><rabbit:connection-factory id ="connectionFactory" connection-factory="rabbitMQConnectionFactory" /></span></code>  
修改RabbitAdmin bean物件定義,使用rabbit:admin標籤
<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory" auto-startup="true"/>
修改rabbitTemplate定義,使用rabbit:template標籤
<rabbit:template connection-factory="connectionFactory" />
MessageConverter和MessageProperties物件沒有對應的rabbit標籤,仍然使用bean標籤。
修改Queue,Exchange和Bind定義,分別使用rabbit:queue,rabbit:exchange標籤,Bind的內容放到了Exchange bean定義內部。

<rabbit:queue id="springMessageQueue" name="springMessageQueue" auto-delete="false"
           durable="true" exclusive="false" auto-declare="false" declared-by="rabbitAdmin" />
 
<rabbit:direct-exchange id="springMessageExchange" name="springMessageExchange" durable="true"
                            auto-declare="false" auto-delete="false" declared-by="rabbitAdmin">
    <rabbit:bindings>
        <rabbit:binding queue="springMessageQueue" key="springMessage"></rabbit:binding>
    </rabbit:bindings>
</rabbit:direct-exchange>
最後使用rabbit:listener-container修改Message Listener Container bean物件。
<rabbit:listener-container message-converter="serializerMessageConverter"
                               connection-factory="connectionFactory"
                               acknowledge="manual"
                               consumer-tag-strategy="consumerTagStrategy"
                               concurrency="5"
                               max-concurrency="5"
                               prefetch="5">
        <rabbit:listener ref="consumerListener" queues="springMessageQueue"/>
</rabbit:listener-container>
如果上面沒有建立queue的bean物件,這裡的rabbit:listener中的queues屬性也可以改成queueNames屬性
<rabbit:listener ref="consumerListener" queue-names="springMessageQueue"/>
這裡如果Listener關聯多個佇列,設定queues屬性或者queue-names屬性時可以用逗號進行分割,例如:

[html] view plain copy
<code class="language-html"><rabbit:listener ref="consumerListener" queue-names="messageQueue1,messageQueue2"/></code>  

使用rabbit標籤雖然可以簡化RabbitMQ相關物件的bean定義,但是它也有侷限性:

1)標籤對應的bean物件型別是固定的,例如rabbit:listener-container標籤對應的Listener Container是SimpleMessageListenerContainer類,如果我們想使用其他MessageListenerContainer類或者自定義Message Listener Container類,就不能使用rabbit標籤。

2)有的標籤無法設定id和name屬性,這樣一旦有多個同類型的bean物件定義時,就不能使用rabbit標籤。


RabbitMQ的Channel和Connection快取

spring-rabbit中的CachingConnectionFactory類提供了Connection和Channel級別的快取,如果我們沒有做任何設定,預設的快取模式是Channel模式,Channel快取預設最大數是25,所有的Channel複用一個Connection。我們在Message Listener Container中設定併發數為5,啟動消費者應用後,我們從管理介面可以看到一個消費者Connection,5個Channel。


重新啟動消費者應用後,我們可以看到有30個Channel被建立,但是隻能有25個Channel被快取,其他5個Channel只是臨時存在於記憶體中,一旦不被使用,會被自動銷燬,不會被回收到Channel快取池中被複用。

如果我們想修改Channel模式下的最大快取數的話,我們可以進行如下修改:

<rabbit:connection-factory id ="connectionFactory"
                                connection-factory="rabbitMQConnectionFactory"
                                cache-mode="CHANNEL"
                                channel-cache-size="30" />

我們也可以設定快取模式為Connection模式,設定最大連線快取數為10
<rabbit:connection-factory id ="connectionFactory"
                                connection-factory="rabbitMQConnectionFactory"
                                cache-mode="CONNECTION"
                                connection-cache-size="10" />
如果我們的Message Listener Container的消費者併發數小於最大快取數,例如為5,管理介面中只顯示有5個Connection,每個Connection上一條Channel。

如果消費者併發數大於最大快取數,例如併發數為20,會出現與併發數對應的連線數,但是隻有5個Connection能夠被快取,其他Connection,如果不再被使用,會被RabbitMQ自動銷燬。

我們還可以設定Connection的上限,使用CachingConnectionFactory的connectionLimit屬性

public class CachingConnectionFactory extends AbstractConnectionFactory
{
  ................
  private volatile int connectionLimit = Integer.MAX_VALUE;

這個屬性預設值是Integer.MAX_VALUE,可以理解為無上限,我們可以在applicationContext.xml中設定這個值為10。
<rabbit:connection-factory id ="connectionFactory"
                                connection-factory="rabbitMQConnectionFactory"
                                connection-limit="10"
                                cache-mode="CONNECTION"
                                connection-cache-size="10" />
此時如果Message Listener Container的Message Listener總併發數大於這個上限,會丟擲無法獲取連線的異常。
<rabbit:listener-container 
                               .............
                               concurrency="4"
                               max-concurrency="4">
        <rabbit:listener ref="Listener1" queues="messageQueue1"/>
    <rabbit:listener ref="Listener2" queues="messageQueue2"/>
    <rabbit:listener ref="Listener3" queues="messageQueue3"/>
</rabbit:listener-container>
例如上面的Container中,一共定義了三個Listener,每個Listener的併發數是4,總的併發數為12,超過了上線10,因此丟擲以下異常:


一月 03, 2017 10:15:28 上午 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer redeclareElementsIfNecessary
嚴重: Failed to check/redeclare auto-delete queue(s).
org.springframework.amqp.AmqpTimeoutException: Timed out attempting to get a connection
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:575)
    ..............
此時,消費者應用與RabbitMQ伺服器之間的Connection數只有上限數10條。


Spring AMQP的重連機制

我們在使用1中介紹了RabbitMQ Java Client提供的重連機制,Spring AMQP也提供了重連機制。我們可以使用Rabbit Java Client的重連設定,我們修改applicationContext.xml中“rabbitMQConnectionFactory”的重連屬性設定。

<bean id="rabbitMQConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
        ...................
        <property name="automaticRecoveryEnabled" value="true" />
        <property name="topologyRecoveryEnabled" value="true" />
        <property name="networkRecoveryInterval" value="60000" />
</bean>

我們啟動消費者應用程式,開啟管理頁面,可以看到消費者應用建立了5個Connection,每個Connection下分別建立了一個Channel,對應5個Consumer。


我們停止RabbitMQ伺服器,可以看到消費者控制檯輸出連線異常資訊,不停試圖恢復Consumer。


重新啟動RabbitMQ伺服器,從日誌資訊可以看出連線被重置,消費者被恢復。

登入管理介面,可以看到原先的5條Channel已經被恢復,但是本地連線埠號與之前的Channel不再一致。

點開一條Channel進去,可以看到連線Channel的Consumer Tag與最初的Consumer Tag也不一致,這可能是因為我們使用了自定義ConsumerTagStrategy,使用執行緒名為Tag名的原因。

我們也可以禁用RabbitMQ Java Client的重連設定,設定automaticRecoveryEnabled和topologyRecoveryEnabled屬性為false。

<bean id="rabbitMQConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
   <property name="automaticRecoveryEnabled" value="false" />
   <property name="topologyRecoveryEnabled" value="false" />
</bean>
我們再啟動消費者應用,可以看到初始有5個Connection,5個Channel,每個Channel對應一個Connection。


當我們重啟RabbitMQ伺服器後,發現只有4個Connection恢復,5個Channel被恢復,但是有兩個Channel複用同一個Connection,這一點與 使用RabbitMQ Java Client的重連機制時有所不同。

當執行RabbitMQ重連時,Message Listener Container也會對Consumer進行重新恢復,它的恢復間隔是由recoveryBackOff屬性決定的。


public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer
        implements ApplicationEventPublisherAware {
      ..........
      private BackOff recoveryBackOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS);

SimpleMessageListenerContainer類的recoveryBackOff屬性物件有兩個屬性,一個是恢復間隔,預設值是DEFAULT_RECOVERY_INTERVAL常量(5000ms,即每5秒試圖進行一次恢復),還有一個嘗試恢復次數,預設值是FixedBackOff.UNLIMITED_ATTEMPTS(Long.MaxValue,可以認為是無限次嘗試)。我們可以根據需要 設定自己的recoveryBackOff屬性,例如下面我們把恢復間隔設定為60000ms,嘗試次數設定為100次。

<bean id="backOff" class="org.springframework.util.backoff.FixedBackOff">
        <constructor-arg name="interval" value="60000" />
        <constructor-arg name="maxAttempts" value="100" />
</bean>
<rabbit:listener-container message-converter="serializerMessageConverter"
                               ..........
                               recovery-back-off="backOff"> 
    <rabbit:listener ref="consumerListener" queues="springMessageQueue"/>
</rabbit:listener-container>
修改後啟動消費者應用,停掉RabbitMQ伺服器,我們從異常日誌可以看出Message Listener Container的重試間隔變成了1分鐘,而不是預設的5000ms。(為了便於檢視重試間隔起見,我們將Container的併發數調整為1)