1. 程式人生 > >RabbitMQ(3) Java客戶端使用

RabbitMQ(3) Java客戶端使用

裏的 新的 tool int else build 實現 綁定 them

RabbitMQ針對不同的開發語言(java,python,c/++,Go等等),提供了豐富對客戶端,方便使用。就Java而言,可供使用的客戶端有RabbitMQ Java client、
RabbitMQ JMS client、apache的camel-rabbitmq、以及Banyan等。在Spring中,也可以使用Spring AMQP、Spring Cloud Data Flow方便對集成RabbitMQ。
實際開發使用中,RabbitMQ Java client和Spring AMQP比較常用。RabbitMQ Java client在使用上更加接近AMQP協議,Spring AMQP則更加方便Spring項目中的集成。本為總結RabbitMQ Java client的使用。

Java客戶端概覽

RabbitMQ的Java客戶端包為amqp-client-{version}.jar,可以從RabbitMQ官網下載後引入項目中。

對於Maven工程,pom.xml中加入以下依賴即可引入RabbitMQ的Java客戶端:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.2.0</version>
</dependency>

amqp-client客戶端主要包含以下包:

說明
com.rabbitmq.client 客戶端api定義,提供了接口和類定義AMQP中connection,channel,queue,exchange等核心對象
com.rabbitmq.client.impl 客戶端具體實現
com.rabbitmq.client.impl.nio 客戶端nio實現
com.rabbitmq.client.impl.recovery
com.rabbitmq.tool.json 對象json工具類
com.rabbitmq.tool.jsonrpc 基於AMQP等json-rpc支持
com.rabbitmq.util 客戶端中使用等工具累

對普通用戶而言,一般只需關註com.rabbitmq.client包,其中定了AMQP協議中對基礎對象,包含以下主要接口和類:

  • Channel: AMQP 0-9-1 Channel對象,表示一個連接通道,提供了大多數AMQP操作,如創建隊列、交換器、綁定等
  • Connection: AMQP 0-9-1 Connection,表示一個客戶端連接
  • ConnectionFactory: Connectiong工廠
  • Consumer: 消息消費者接口
  • DefaultConsumer: 消費者接口默認實現
  • BasicProperties: 消息屬性對象,用於發送消息時設置消息屬性
  • BasicProperties.Builder: BasicProperties構建器

使用ConnectionFactory創建出Connection對象,在使用Connection對象創建一個Channel,在Channel上即可完成基本的發送消息,消費消息等AMQP操作;
發送消息時,可通過BasicProperties設置消息屬性;可以通過實現Consumer接口或繼承DefaultConsumer類實現一個消費者來消費消息。總之,通過以上對象,
即可完成基本的消息從生產到消費的全流程。

連接到RabbitMQ

通過ConnectionFactory工廠方法,設置連接屬性,生成Connection對象,建立客戶端到RabbitMQ的連接。在Connection上創建Channel,建立一個連接通道。
Channel不是線程安全的,在實際使用中,應該通過Connection為每個線程創建獨立的Channel。

#設置連接屬性。未設置時使用默認值:使用默認賬號guest連接到localhost到默認vhost "/"
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

#生成Connection & Channel
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();

也可以通過設置URI的方式來建立連接:

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri("amqp://username:password@hostname:port/vhost");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();

Channel接口上定義AMQP協議幾乎所有的操作。建立好到RabbitMQ到連接後,就可以在Channel對象上執行AMQP的操作,如聲明隊列、交換器、綁定等。

操作exchange、queue和binding

queue操作

聲明隊列

Channel定義來以下三組方法來聲明隊列:

  1. 普通的queueDeclare方,有兩個重載版本

    Queue.DeclareOk queueDeclare()
    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)

    第一個不帶參數的queueDeclare()方法聲明一個隊列,隊列名稱由rabbitMQ自動生成,該隊列事非持久的、排他的、自動刪除的;

    第二個方法聲明隊列,可以指定用戶設定的隊列屬性和參數,是最常用的方法。其中各個參數含義如下:
    • queue: 隊列名稱
    • durable: 隊列是否持久話。持久化以為著隊列可以從RabbitMQ重啟中恢復。
    • exclusive: 排他性。排他性的隊列只對首次聲明它的連接(Connection而不是Channel)可見,並將在連接斷開是自動刪除隊列。排他性的隊列被首次聲明後,
      其他連接是不允許創建同名隊列的,這種類型的隊列使用場景很有限。
    • autoDelete: 隊列是否自動刪除。自動刪除的前提是,至少有一個消費者連接到該隊列,而後由斷開來連接,隊列沒有任何消費者時,隊列被自動刪除。
  2. queueDeclareNoWait方法
    void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
    使用queueDeclareNoWait方法聲明隊列時,不等待服務端到響應,直接返回。這種情況下,聲明完隊列後立即使用可能引發異常。
  3. queueDeclarePassive方法
    Queue.DeclareOk queueDeclarePassive(String queue)
    最後一個queueDeclarePassive方法不是真正對聲明隊列,而只是檢查隊列是否存在,如果隊列存在則正常返回;否則會拋出一個異常,並且執行該操作對Channel不再
    可用,後續應該創建新的Channel對象使用。

刪除隊列

刪除隊列有三個方法:

  1. 直接刪除
    Queue.DeleteOk queueDelete(String queue) throws IOException;
    該方法會直接刪除掉指定的隊列,而不隊列對狀態,如對是否正在使用、隊列中是否還有數據等。
  2. 按需刪除
    Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException; void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
    指定ifUnused為true,則只有當隊列未使用是才會被刪除;指定ifEmpty,則只有當隊列為空,裏面沒數據是才會被刪除。
  3. 清空隊列
    queuePurge不刪除隊列,而是清空隊列中數據。
    Queue.PurgeOk queuePurge(String queue) throws IOException;

exchange操作

聲明exchange

聲明exchange的方法也分為三組:

  1. 普通的exchangeDeclare方法

    Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
    Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException;
    Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
    Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable) throws IOException;
    Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException;
    Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException;
    Exchange.DeclareOk exchangeDeclare(String exchange,
                                       String type,
                                       boolean durable,
                                       boolean autoDelete,
                                       boolean internal,
                                       Map<String, Object> arguments) throws IOException;                                           
    Exchange.DeclareOk exchangeDeclare(String exchange,
                                       BuiltinExchangeType type,
                                       boolean durable,
                                       boolean autoDelete,
                                       boolean internal,
                                       Map<String, Object> arguments) throws IOException;
    
    各個參數含義如下:
    • exchange: 交換器的名稱
    • type/BuiltinExchangeType: 交換器的類型
    • durable: 是否持久化。持久化的交換器會從RabbitMQ服務重啟中恢復,而不用重新創建。
    • autoDelete: 是否自動刪除。自動刪除的前提是,只是有一隊列或交換器與該交換器綁定,之後所有與該交換器綁定的隊列/交換器都進行來解綁。
    • internal: 是否為內置交換器。內置交換器是不允許客戶端發送消息的。內置交換使用的場景是與其他交換器綁定(RabbitMQ擴展,非AMQP原生功能)
    • arguments: 其他的結構化參數
  2. exchangeDeclareNoWait方法
    使用exchangeDeclareNoWait方法聲明exchange,方法調用不等待服務端的響應,直接返回,各個參數含義與上面相同。所以聲明exchange後立即使用,很可能引發異常。

    void exchangeDeclareNoWait(String exchange,
                               String type,
                               boolean durable,
                               boolean autoDelete,
                               boolean internal,
                               Map<String, Object> arguments) throws IOException; 
    void exchangeDeclareNoWait(String exchange,
                               BuiltinExchangeType type,
                               boolean durable,
                               boolean autoDelete,
                               boolean internal,
                               Map<String, Object> arguments) throws IOException; 
  3. exchangeDeclarePassive方法
    與queueDeclarePassive方法類似,exchangeDeclarePassive用來檢查exchange是否存在,而不會創建exchange。

    Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;     

    刪除exchange

    與刪除隊列類似,可以直接刪除交換器或是按需刪除。
  4. 直接刪除
    Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;
  5. 按需刪除
    Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException; void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;
    ifUnused設置為true時,只有當交換器未被使用時,才會被刪除。

binding操作

隊列與交換器綁定/解綁

把隊列和交換器綁定起來,使用queueBind方法:

Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

被綁定的隊列和交換器的名稱分別由參數queue和exchange指定,綁定的key通過參數routingKey指定。arguments指定了其他的結構化參數。
對應於綁定方法,有如下兩個解綁方法:

Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

交換器與交換器綁定/解綁

作為RabbitMQ對AMQP的擴展功能,交換器允許和別的交換器綁定起來。這和隊列與交換器對綁定在使用上沒有太大對不同。可以將交換器E1和E2綁定起來,在將E2和隊列
Q綁定起來,之後生產者向E1發送消息,消息在E1上被路由到E2,再由E2路由到Q。
綁定交換器和其他交換器使用如下方法:

Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;
Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

destination和source參數分別指定目的交換器和源交換器。
同樣,通過以下方法解綁:

Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey) throws IOException;
Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

void exchangeUnbindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

何時需要創建隊列、交換器以及它們之間的綁定關系呢?這取決與應用程序。在一些復雜應用中,可能需要管理員事先規劃好交換器、隊列以及綁定關系,並在服務隊創建好
這些資源,客戶端直接去使用即可,這樣便於統一規劃,防止客戶端隨意創建資源造成錯誤或是資源浪費;在其他一些應用中,可能由客戶端自己創建這些資源並使用。總之,
這取決於應用本身的規劃。

發送消息

在channel對象上,使用basicPublish方法,將消息發送到指定的交換器。basicPublish有三個重載方法:

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;

發送消息是,指定的各個參數含義如下:

  • exchange: exchange名稱,指定消息發送到哪個交換器
  • routingKey: 消息到路由鍵
  • mandatory: 該參數決定消息到達exchange後無法路由時到處理方式。指定為true,當消息到達exchange時,如果根據路由鍵找不到匹配到隊列,該消息將被返回給
    客戶端;否則,無法路由到消息被直接丟棄。
  • immediate: 該參數設置為true,消息在從exchange路由到隊列時,如果隊列上消費者,消息將被投遞到隊列,否則將消息返回給生產者。不過,該參數從RabbitMQ 3.0版本開始已經廢棄掉。
  • props: 指定消息到其他屬性,如header、deliveryMode、contentType等。
  • body: 消息體

消費消息

在RabbitMQ中,消費者可以以推模式和拉模式兩種方式消費消息,分別對象basicConsume方法和basicGet方法。推模式中,由服務端主動推送消息到消費這,消費者在
會調方法中處理消息,然後響應服務端;拉模式中,消費者每次主動從隊列中拉起單條消息消費,然後響應服務端。

推模式

推模式使用basicConsume方法來消費消息,可以看到,RabbitMQ定義了basicConsume方法許多重載版本:

String basicConsume(String queue, Consumer callback) throws IOException;
String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
String basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;

各個參數含義如下:

  • queue: 指定隊列名稱,即消費哪個隊列裏的數據。
  • autoAck: 是否自動進行消息確認。設置為true表示自動消息確認,消費者收到消息時,自動向服務端發送ack信號,服務端收到該信號,則認為消息已經被消費,將其從隊列中刪除。
    設置為false值,需要消費者手動進行ack。
  • consumerTag: 消費者標簽,用了區分多個消費者。
  • noLocal: 設置為true,表示不能將同一個Connection中生成者發送的消息傳遞給這個Connection中的消費者。
  • exclusive: 是否排他。
  • arguments: 其他結構化參數。
  • callback: 消費者會調函數,處理發送過來的消息。
  • cancelCallback: 消費者取消訂閱時的回調方法。
  • shutdownSignalCallback: channel或者connection關閉時的回調方法。
  • 其他回調方法...

可見,消費者主要還是定義各個回調方法,然後調用basicConsume方法消費消息。最主要的回調參數還是callback,定了了消費者消費消息的主要流程,可以同實現
Consumer接口或繼承DefaultConsumer類來實現一個Consumer,如:

String QUEUE_NAME ="";
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,"myTag",
                new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope,
                                               AMQP.BasicProperties properties,
                                               byte[] body) throws IOException{
                       long deliveryTag = envelope.getDeliveryTag();
                       process(body,consumerTag,envelope,properties) //消費消息
                       channel.basicAck(deliveryTag,false);
                    }
                });

拉模式

拉模式只有basicGet一個方法:

GetResponse basicGet(String queue, boolean autoAck) throws IOException;

參數queue指定從哪個隊列中讀取消息;參數autoAck指定是否自動進行消息確認,設定為fasle時,需要消費者對消息消費後,主動向服務端發送ack消息。

GetResponse response = channel.basicGet(QUEUE_NAME, false);
handle(response) //消費消息
channel.basicAck(response.getEnvelope.getDeliveryTag,false);

拒絕消息

消費者可以拒絕消費投遞過來的消息,使用basicReject拒絕單條消息,使用basicNack可以具體多條消息。basicNack不是AMQP的標準操作,而是RabbitMQ的擴展。

void basicReject(long deliveryTag, boolean requeue) throws IOException;
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

各個參數含義:

  • deliveryTag: 消息的編號
  • requeue: 消費者拒絕後,服務端是否將消息重新入隊
  • multiple: basicNack,multiple參數設置為true時,表明一次性拒絕多條消息:deliveryTag編號之前的所有未被消費的消息都被拒絕。

一個完整的示例

我們將實現一個簡單的告警處理系統。告警分為io告警和file告警兩種類型,告警級別有error和warning兩種。
整體結構如下圖所示
技術分享圖片

生產者將告警發送到topic類型的交換器中,交換器更具告警的級別將告警路由分別路由到ErrorQueue和WarningQueue中。消費者ErrorConsumer訂閱ErrorQueue
消費error級別的告警消息;IOWarningConsumer訂閱WarningQueue,處理warning級別到告警,正如其名稱,IOWarningConsumer只對io類型到告警消費,其他類型
的告警reject掉。

實現如下:

  • Producer: 隨機生產各自類型、級別的告警,並發送到交換器

    public class Producer {
        public static String EXCHANEG = "alert_exchange";
        public static String WARNING_QUEUE = "waring_queue";
        public static String ERROR_QUEUE = "error_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //get a default connection and create a channel
            ConnectionFactory cf = new ConnectionFactory();
            Connection connection = cf.newConnection();
            final Channel channel = connection.createChannel();
    
            //declare exchange,queues and bindings
            channel.exchangeDeclare(EXCHANEG, BuiltinExchangeType.TOPIC);
            channel.queueDeclare(WARNING_QUEUE,true,false,false,null);
            channel.queueDeclare(ERROR_QUEUE,true,false,false,null);
            channel.queueBind(WARNING_QUEUE, EXCHANEG,"*.warning");
            channel.queueBind(ERROR_QUEUE, EXCHANEG,"*.error");
    
            while (true){
                String key = randomRoutingKey();
                String message = message(key);
                channel.basicPublish(EXCHANEG,key,null,message.getBytes());
                System.out.println("Sent > " + message);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static String[] keys = new String[]{"io.error","io.warning","file.error","file.warning"};
    
        public static String randomRoutingKey(){
            Random random = new Random();
            int rand = Math.abs(random.nextInt())%(keys.length);
            return keys[rand];
        }
    
        public static String message(String key){
            String message = "Alert ..." + " level:" + key;
            return message;
        }
    }
  • ErrorConsumer: 消費error級別到告警

    public class ErrorConsumer {
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory cf = new ConnectionFactory();
            Connection connection = cf.newConnection();
            final Channel channel = connection.createChannel();
    
            boolean autoAck = false;
            channel.basicConsume(Producer.ERROR_QUEUE,false,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException{
                    String message = new String(body);
                    //consume message
                    System.out.println("consumed. " + message);
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            });
        }
    }
  • IOWarningConsumer: 消費io類型、warning級別的告警。

     public class IOWarningConsumer {
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory cf = new ConnectionFactory();
            Connection connection = cf.newConnection();
            final Channel channel = connection.createChannel();
    
            boolean autoAck = false;
            channel.basicConsume(Producer.WARNING_QUEUE,autoAck,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException{
                    String key = envelope.getRoutingKey();
                    String message = new String(body);
                    long deliveryTag = envelope.getDeliveryTag();
                    if (key.startsWith("io")) { // only consumer io warning messages
                        //consume message
                        System.out.println("consumed: " + message);
                        channel.basicAck(deliveryTag,false);
    
                    } else { //reject other messages and requeue them
                        System.out.println("rejected: " + message);
                        channel.basicReject(deliveryTag,true);
                    }
                }
            });
    
        }
    }

分別啟動Producer、ErrorConsumer和IOWarningConsumer。 從輸出結果中可以看到:

  • Producer每隔一秒鐘隨機生產一條消息:

    Sent > Alert ... level:io.error
    Sent > Alert ... level:file.warning
    Sent > Alert ... level:file.error
    Sent > Alert ... level:io.warning
    Sent > Alert ... level:file.warning
    Sent > Alert ... level:io.error
    Sent > Alert ... level:file.error
    Sent > Alert ... level:file.error
    Sent > Alert ... level:io.warning
    Sent > Alert ... level:file.warning
    Sent > Alert ... level:file.error
  • ErrorConsumer處理了所有error級別的消息:

    consumed. Alert ... level:io.error
    consumed. Alert ... level:file.error
    consumed. Alert ... level:io.error
    consumed. Alert ... level:file.error
    consumed. Alert ... level:file.error
    consumed. Alert ... level:file.error
  • IOWarningConsumer消費了io類型的warning消息,拒絕了其他消息。

    rejected: Alert ... level:file.warning  
    consumed: Alert ... level:io.warning  
    rejected: Alert ... level:file.warning  
    consumed: Alert ... level:io.warning    
    rejected: Alert ... level:file.warning   
    rejected: Alert... level:file.warning   
    ...

    在IOWrningConsumer,對於reject的消息,其requeue參數設置為true,這樣消息被拒絕後,在服務端將重新入隊,然後又推送給該消費者,一直循環。實際
    應用中,應該避免這種情況到發生,此處只是為了演示reject的用法。
    查看隊列狀態,可以發現warning_queue中始終存在未被確認的消息:

    $ rabbitmqctl list_queues name,durable,messages_ready,messages_unacknowledged,messages
    Timeout: 60.0 seconds ...
    Listing queues for vhost / ...
    error_queue     true    0   0   0
    task_queue      true    0   0   0
    waring_queue    true    19  0   19

    總結

    使用java客戶端,創建ConnectionFactory對象,設置連接屬性後,創建已經到服務到Connection,Connection代表一個Tcp連接,可以在客戶端復用,復用方式
    為使用Connection創建Channel。可以根據規劃事先在服務端創建好交換器、隊列等資源,也可以在Channel上聲明這些資源,聲明資源時,如果資源不存在則會創建。
    一切資源就緒,生產者在Channel使用basicPublish發送消息到指定到交換器,消費這訂閱指定的隊列,定義消費消息的回調函數,在Channel上使用basicConsume消費消息。

RabbitMQ(3) Java客戶端使用