1. 程式人生 > >rabbitmq介紹及rabbitmq在java中基礎使用

rabbitmq介紹及rabbitmq在java中基礎使用

RabbitMQ簡介
RabbitMQ是一個在AMQP基礎上完整的,可複用的企業訊息系統。它可以用於大型軟體系統各個模組之間的高效通訊,支援高併發,支援可擴充套件。使用Erlang語言編寫。

RabbitMQ相關術語:
          1.Broker:簡單來說就是訊息佇列伺服器實體。
          2.Exchange:訊息交換機,它指定訊息按什麼規則,路由到哪個佇列。
          3.Queue:訊息佇列載體,每個訊息都會被投入到一個或多個佇列。
          4.Binding:繫結,它的作用就是把exchange和queue按照路由規則繫結起來。
          5.Routing Key:路由關鍵字,exchange根據這個關鍵字進行訊息投遞。
          6.vhost:虛擬主機,一個broker裡可以開設多個vhost,用作不同使用者的許可權分離。
          7.producer:訊息生產者,就是投遞訊息的程式。
          8.consumer:訊息消費者,就是接受訊息的程式。
          9.channel:訊息通道,在客戶端的每個連線裡,可建立多個channel,每個channel代表一個會話任務。


RabbitMQ常用釋出訂閱模式的執行流程:
AMQP模型中,訊息在producer中產生,傳送到MQ的exchange上,exchange根據配置的路由方式發到相應的Queue上,Queue又將訊息傳送給consumer,訊息從queue到consumer有push和pull兩種方式。 訊息佇列的使用過程大概如下:

          1.客戶端連線到訊息佇列伺服器,開啟一個channel。
          2.客戶端宣告一個exchange,並設定相關屬性。
          3.客戶端宣告一個queue,並設定相關屬性。
          4.客戶端使用routing key,在exchange和queue之間建立好繫結關係。
          5.客戶端投遞訊息到exchange。

exchange接收到訊息後,就根據訊息的key和已經設定的binding,進行訊息路由,將訊息投遞到一個或多個佇列裡。 exchange也有幾個型別,下面會有介紹。


RabbitMQ教程:

引入maven依賴:

<dependency>
	<groupId>com.rabbitmq</groupId>
	<artifactId>amqp-client</artifactId>
	<version>3.6.5</version>
</dependency>

1.hello world!

生產者:

package com.rabbitmq.test.T_helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.test.util.ConnectionUtil;

/**
 * helloworld
 * @author lenovo
 *
 */
public class Producer {

	private final static String QUEUE_NAME = "test_queue";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 從連線中建立通道
        Channel channel = connection.createChannel();

        /*
         * 宣告(建立)佇列
         * 引數1:佇列名稱
         * 引數2:為true時server重啟佇列不會消失
         * 引數3:佇列是否是獨佔的,如果為true只能被一個connection使用,其他連線建立時會丟擲異常
         * 引數4:佇列不再使用時是否自動刪除(沒有連線,並且沒有未處理的訊息)
         * 引數5:建立佇列時的其他引數
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 訊息內容
        String message = "Hello World!";
        /*
         * 向server釋出一條訊息
         * 引數1:exchange名字,若為空則使用預設的exchange
         * 引數2:routing key
         * 引數3:其他的屬性
         * 引數4:訊息體
         * RabbitMQ預設有一個exchange,叫default exchange,它用一個空字串表示,它是direct exchange型別,
         * 任何發往這個exchange的訊息都會被路由到routing key的名字對應的佇列上,如果沒有對應的佇列,則訊息會被丟棄
         */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [生產者] Sent '" + message + "'");

        //關閉通道和連線
        channel.close();
        connection.close();
    }
}
消費者:
package com.rabbitmq.test.T_helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.test.util.ConnectionUtil;

public class Consumer {

	private final static String QUEUE_NAME = "test_queue";

    public static void main(String[] argv) throws Exception {

        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 從連線中建立通道
        Channel channel = connection.createChannel();

        // 宣告佇列(如果你已經明確的知道有這個佇列,那麼下面這句程式碼可以註釋掉,如果不註釋掉的話,也可以理解為消費者必須監聽一個佇列,如果沒有就建立一個)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
       /*
         * 監聽佇列
         * 引數1:佇列名稱
         * 引數2:是否傳送ack包,不傳送ack訊息會持續在服務端儲存,直到收到ack。  可以通過channel.basicAck手動回覆ack
         * 引數3:消費者
         */ 
        channel.basicConsume(QUEUE_NAME, true, consumer);

        // 獲取訊息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [消費者] Received '" + message + "'");
        }
    }
}
2.Work模式
Work普通模式

生產者:

package com.rabbitmq.test.T_work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.test.util.ConnectionUtil;

/**
 * work模式
 * @author lenovo
 *
 */
public class Producer {

	private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 50; i++) {
            // 訊息內容
            String message = "" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [生產者] Sent '" + message + "'");
            //傳送的訊息間隔越來越長
            Thread.sleep(i * 10);
        }

        channel.close();
        connection.close();
    }
}
消費者1:
package com.rabbitmq.test.T_work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.test.util.ConnectionUtil;

public class Consumer1 {

	private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        // 同一時刻伺服器只會發一條訊息給消費者(能者多勞模式)
        //channel.basicQos(1);

        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        /*
         * 監聽佇列,不自動返回ack包,下面手動返回
         * 如果不回覆,訊息不會在伺服器刪除
         */ 
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取訊息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [消費者1] Received '" + message + "'");
            //休眠
            Thread.sleep(10);
            // 手動返回ack包確認狀態
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
          //channel.basicReject(); channel.basicNack(); //可以通過這兩個函式拒絕訊息,可以指定訊息在伺服器刪除還是繼續投遞給其他消費者
        }
    }
}
消費者2:
package com.rabbitmq.test.T_work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.test.util.ConnectionUtil;

public class Consumer2 {

	private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一時刻伺服器只會發一條訊息給消費者(能者多勞模式)
        //channel.basicQos(1);

        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽佇列,手動返回完成狀態
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取訊息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [消費者2] Received '" + message + "'");
            // 休眠1秒
            Thread.sleep(1000);

            //反饋訊息的消費狀態
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
測試結果:
1、消費者1和消費者2獲取到的訊息內容是不同的,同一個訊息只能被一個消費者獲取。
2、消費者1和消費者2獲取到的訊息的數量是相同的,一個是奇數一個是偶數。

其實,這樣是不合理的,應該是消費者1要比消費者2獲取到的訊息多才對。
Work的能者多勞模式
需要將上面兩個消費者的channel.basicQos(1);這行程式碼的註釋開啟,再次執行會發現,休眠時間短的消費者執行的任務多
訊息的確認
在以上的程式碼中,已經給出了註釋,如何使用自動確認和手動確認,消費者從佇列中獲取訊息,服務端如何知道訊息已經被消費呢?
模式1:自動確認
只要訊息從佇列中獲取,無論消費者獲取到訊息後是否成功訊息,都認為是訊息已經成功消費。
模式2:手動確認
消費者從佇列中獲取訊息後,伺服器會將該訊息標記為不可用狀態,等待消費者的反饋,如果消費者一直沒有反饋,那麼該訊息將一直處於不可用狀態。
如果選用自動確認,在消費者拿走訊息執行過程中出現宕機時,訊息可能就會丟失!!



3.訂閱模式



文章開頭有釋出訂閱的流程介紹

生產者:

package com.rabbitmq.test.T_pubsub;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.test.util.ConnectionUtil;

/**
 * 訂閱模式
 * @author lenovo
 *
 */
public class Producer {

	//交換機的名稱
	private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        /*
         * 宣告exchange(交換機)
         * 引數1:交換機名稱
         * 引數2:交換機型別
         * 引數3:交換機永續性,如果為true則伺服器重啟時不會丟失
         * 引數4:交換機在不被使用時是否刪除
         * 引數5:交換機的其他屬性
         */
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true,true,null);

        // 訊息內容
        String message = "訂閱訊息";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [生產者] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}
消費者1:
package com.rabbitmq.test.T_pubsub;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.test.util.ConnectionUtil;

public class Consumer1 {

	private final static String QUEUE_NAME = "test_queue_exchange_1";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        
        /*
         * 繫結佇列到交換機(這個交換機的名稱一定要和上面的生產者交換機名稱相同)
         * 引數1:佇列的名稱
         * 引數2:交換機的名稱
         * 引數3:Routing Key
         * 
         */
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一時刻伺服器只會發一條訊息給消費者
        channel.basicQos(1);

        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽佇列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取訊息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [消費者1] Received '" + message + "'");
            Thread.sleep(10);
            
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
消費者2:
package com.rabbitmq.test.T_pubsub;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.test.util.ConnectionUtil;

public class Consumer2 {

	private final static String QUEUE_NAME = "test_queue_exchange_2";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 繫結佇列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一時刻伺服器只會發一條訊息給消費者
        channel.basicQos(1);

        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽佇列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取訊息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [消費者2] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

注意:訊息傳送到沒有佇列繫結的交換機時,訊息將丟失,因為,交換機沒有儲存訊息的能力,訊息只能存在在佇列中。


Exchange型別
Direct 、Fanout 、Topic  三種類型,RabbitMQ預設有一個exchange,叫default exchange,它用一個空字串表示,它是direct exchange型別。



下面介紹的路由模式和萬用字元模式都是屬於訂閱模式,只不過加入了Routing Key(路由鍵,文章開頭有介紹)。

3.1路由模式



生產者:

package com.rabbitmq.test.T_routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.test.util.ConnectionUtil;

/**
 * 路由模式
 * @author lenovo
 *
 */
public class Producer {

	private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        // 訊息內容
        String message = "這是訊息B";
        channel.basicPublish(EXCHANGE_NAME, "B", null, message.getBytes());
        System.out.println(" [生產者] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}
消費者1:
package com.rabbitmq.test.T_routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.test.util.ConnectionUtil;

public class Consumer1 {

	private final static String QUEUE_NAME = "test_queue_direct_1";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {

        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        /*
         * 繫結佇列到交換機
         * 引數1:佇列的名稱
         * 引數2:交換機的名稱
         * 引數3:routingKey
         */
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "A");

        // 同一時刻伺服器只會發一條訊息給消費者
        channel.basicQos(1);

        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽佇列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取訊息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [消費者1] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
消費者2:
package com.rabbitmq.test.T_routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.test.util.ConnectionUtil;

public class Consumer2 {

	private final static String QUEUE_NAME = "test_queue_direct_2";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {

        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 繫結佇列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "B");
        //如果想讓消費者2同時接受routingKey為A 和為B的訊息,只要在下面在此新增一個Bing就可以了
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "A");

        // 同一時刻伺服器只會發一條訊息給消費者
        channel.basicQos(1);

        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽佇列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取訊息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [消費者2] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

3.2萬用字元模式
將路由鍵和某模式進行匹配。此時佇列需要繫結要一個模式上。符號“#”匹配一個或多個詞,符號“*”只能匹配一個詞。因此“audit.#”能夠匹配到“audit.irs”和“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。


生產者:

package com.rabbitmq.test.T_topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.test.util.ConnectionUtil;

/**
 * 通配模式
 * @author lenovo
 *
 */
public class Producer {

	private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        // 訊息內容  模擬 有人購物下訂單
        String message = "新增訂單:id=101";
        channel.basicPublish(EXCHANGE_NAME, "order.insert", null, message.getBytes());
        System.out.println(" [生產者] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
	
}
消費者1:
package com.rabbitmq.test.T_topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.test.util.ConnectionUtil;

public class Consumer1 {

	private final static String QUEUE_NAME = "test_queue_topic_1";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 繫結佇列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "order.#");

        // 同一時刻伺服器只會發一條訊息給消費者
        channel.basicQos(1);

        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽佇列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取訊息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [財務系統] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
消費者2:
package com.rabbitmq.test.T_topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.test.util.ConnectionUtil;

public class Consumer2 {

	private final static String QUEUE_NAME = "test_queue_topic_2";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 繫結佇列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "order.insert");

        // 同一時刻伺服器只會發一條訊息給消費者
        channel.basicQos(1);

        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽佇列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取訊息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [物流系統] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}